001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyLong;
033import static org.mockito.Mockito.doThrow;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.never;
036import static org.mockito.Mockito.spy;
037import static org.mockito.Mockito.times;
038import static org.mockito.Mockito.verify;
039import static org.mockito.Mockito.when;
040
041import java.io.IOException;
042import java.io.InterruptedIOException;
043import java.math.BigDecimal;
044import java.nio.charset.StandardCharsets;
045import java.security.PrivilegedExceptionAction;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.List;
050import java.util.Map;
051import java.util.NavigableMap;
052import java.util.Objects;
053import java.util.TreeMap;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.atomic.AtomicBoolean;
061import java.util.concurrent.atomic.AtomicInteger;
062import java.util.concurrent.atomic.AtomicReference;
063import org.apache.commons.lang3.RandomStringUtils;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.fs.FSDataOutputStream;
066import org.apache.hadoop.fs.FileStatus;
067import org.apache.hadoop.fs.FileSystem;
068import org.apache.hadoop.fs.Path;
069import org.apache.hadoop.hbase.ArrayBackedTag;
070import org.apache.hadoop.hbase.Cell;
071import org.apache.hadoop.hbase.Cell.Type;
072import org.apache.hadoop.hbase.CellBuilderFactory;
073import org.apache.hadoop.hbase.CellBuilderType;
074import org.apache.hadoop.hbase.CellUtil;
075import org.apache.hadoop.hbase.CompareOperator;
076import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
077import org.apache.hadoop.hbase.DroppedSnapshotException;
078import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
079import org.apache.hadoop.hbase.HBaseClassTestRule;
080import org.apache.hadoop.hbase.HBaseConfiguration;
081import org.apache.hadoop.hbase.HBaseTestingUtility;
082import org.apache.hadoop.hbase.HColumnDescriptor;
083import org.apache.hadoop.hbase.HConstants;
084import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
085import org.apache.hadoop.hbase.HDFSBlocksDistribution;
086import org.apache.hadoop.hbase.HRegionInfo;
087import org.apache.hadoop.hbase.HTableDescriptor;
088import org.apache.hadoop.hbase.KeyValue;
089import org.apache.hadoop.hbase.MiniHBaseCluster;
090import org.apache.hadoop.hbase.MultithreadedTestUtil;
091import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
092import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
093import org.apache.hadoop.hbase.NotServingRegionException;
094import org.apache.hadoop.hbase.PrivateCellUtil;
095import org.apache.hadoop.hbase.RegionTooBusyException;
096import org.apache.hadoop.hbase.ServerName;
097import org.apache.hadoop.hbase.StartMiniClusterOption;
098import org.apache.hadoop.hbase.TableName;
099import org.apache.hadoop.hbase.TagType;
100import org.apache.hadoop.hbase.Waiter;
101import org.apache.hadoop.hbase.client.Append;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
103import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
104import org.apache.hadoop.hbase.client.Delete;
105import org.apache.hadoop.hbase.client.Durability;
106import org.apache.hadoop.hbase.client.Get;
107import org.apache.hadoop.hbase.client.Increment;
108import org.apache.hadoop.hbase.client.Mutation;
109import org.apache.hadoop.hbase.client.Put;
110import org.apache.hadoop.hbase.client.RegionInfo;
111import org.apache.hadoop.hbase.client.RegionInfoBuilder;
112import org.apache.hadoop.hbase.client.Result;
113import org.apache.hadoop.hbase.client.RowMutations;
114import org.apache.hadoop.hbase.client.Scan;
115import org.apache.hadoop.hbase.client.Table;
116import org.apache.hadoop.hbase.client.TableDescriptor;
117import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
118import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
119import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
120import org.apache.hadoop.hbase.filter.BigDecimalComparator;
121import org.apache.hadoop.hbase.filter.BinaryComparator;
122import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
123import org.apache.hadoop.hbase.filter.Filter;
124import org.apache.hadoop.hbase.filter.FilterBase;
125import org.apache.hadoop.hbase.filter.FilterList;
126import org.apache.hadoop.hbase.filter.NullComparator;
127import org.apache.hadoop.hbase.filter.PrefixFilter;
128import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
129import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
130import org.apache.hadoop.hbase.filter.SubstringComparator;
131import org.apache.hadoop.hbase.filter.ValueFilter;
132import org.apache.hadoop.hbase.io.hfile.HFile;
133import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
134import org.apache.hadoop.hbase.monitoring.MonitoredTask;
135import org.apache.hadoop.hbase.monitoring.TaskMonitor;
136import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
137import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
138import org.apache.hadoop.hbase.regionserver.Region.RowLock;
139import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
141import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
142import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
143import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
144import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
145import org.apache.hadoop.hbase.security.User;
146import org.apache.hadoop.hbase.test.MetricsAssertHelper;
147import org.apache.hadoop.hbase.testclassification.LargeTests;
148import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
149import org.apache.hadoop.hbase.util.Bytes;
150import org.apache.hadoop.hbase.util.CommonFSUtils;
151import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
153import org.apache.hadoop.hbase.util.FSUtils;
154import org.apache.hadoop.hbase.util.HFileArchiveUtil;
155import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
156import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
157import org.apache.hadoop.hbase.util.Threads;
158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
159import org.apache.hadoop.hbase.wal.FaultyFSLog;
160import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
161import org.apache.hadoop.hbase.wal.WAL;
162import org.apache.hadoop.hbase.wal.WALEdit;
163import org.apache.hadoop.hbase.wal.WALFactory;
164import org.apache.hadoop.hbase.wal.WALKeyImpl;
165import org.apache.hadoop.hbase.wal.WALProvider;
166import org.apache.hadoop.hbase.wal.WALProvider.Writer;
167import org.apache.hadoop.hbase.wal.WALSplitUtil;
168import org.junit.After;
169import org.junit.Assert;
170import org.junit.Before;
171import org.junit.ClassRule;
172import org.junit.Rule;
173import org.junit.Test;
174import org.junit.experimental.categories.Category;
175import org.junit.rules.ExpectedException;
176import org.junit.rules.TestName;
177import org.mockito.ArgumentCaptor;
178import org.mockito.ArgumentMatcher;
179import org.mockito.Mockito;
180import org.mockito.invocation.InvocationOnMock;
181import org.mockito.stubbing.Answer;
182import org.slf4j.Logger;
183import org.slf4j.LoggerFactory;
184
185import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
186import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
187import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
188import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
189import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
190
191import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
198
199/**
200 * Basic stand-alone testing of HRegion.  No clusters!
201 *
202 * A lot of the meta information for an HRegion now lives inside other HRegions
203 * or in the HBaseMaster, so only basic testing is possible.
204 */
205@Category({VerySlowRegionServerTests.class, LargeTests.class})
206@SuppressWarnings("deprecation")
207public class TestHRegion {
208
209  @ClassRule
210  public static final HBaseClassTestRule CLASS_RULE =
211      HBaseClassTestRule.forClass(TestHRegion.class);
212
213  // Do not spin up clusters in here. If you need to spin up a cluster, do it
214  // over in TestHRegionOnCluster.
215  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
216  @Rule
217  public TestName name = new TestName();
218  @Rule public final ExpectedException thrown = ExpectedException.none();
219
220  private static final String COLUMN_FAMILY = "MyCF";
221  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
222  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
223
224  HRegion region = null;
225  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
226  protected static HBaseTestingUtility TEST_UTIL;
227  public static Configuration CONF ;
228  private String dir;
229  private final int MAX_VERSIONS = 2;
230
231  // Test names
232  protected TableName tableName;
233  protected String method;
234  protected final byte[] qual = Bytes.toBytes("qual");
235  protected final byte[] qual1 = Bytes.toBytes("qual1");
236  protected final byte[] qual2 = Bytes.toBytes("qual2");
237  protected final byte[] qual3 = Bytes.toBytes("qual3");
238  protected final byte[] value = Bytes.toBytes("value");
239  protected final byte[] value1 = Bytes.toBytes("value1");
240  protected final byte[] value2 = Bytes.toBytes("value2");
241  protected final byte[] row = Bytes.toBytes("rowA");
242  protected final byte[] row2 = Bytes.toBytes("rowB");
243
244  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
245      .getInstance(MetricsAssertHelper.class);
246
247  @Before
248  public void setup() throws IOException {
249    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
250    CONF = TEST_UTIL.getConfiguration();
251    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
252    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
253    method = name.getMethodName();
254    tableName = TableName.valueOf(method);
255    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
256  }
257
258  @After
259  public void tearDown() throws IOException {
260    // Region may have been closed, but it is still no harm if we close it again here using HTU.
261    HBaseTestingUtility.closeRegionAndWAL(region);
262    EnvironmentEdgeManagerTestHelper.reset();
263    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
264    TEST_UTIL.cleanupTestDir();
265  }
266
267  /**
268   * Test that I can use the max flushed sequence id after the close.
269   * @throws IOException
270   */
271  @Test
272  public void testSequenceId() throws IOException {
273    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
274    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
275    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
276    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
277    HBaseTestingUtility.closeRegionAndWAL(this.region);
278    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
279    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
280    // Open region again.
281    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
282    byte [] value = Bytes.toBytes(method);
283    // Make a random put against our cf.
284    Put put = new Put(value);
285    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
286    region.put(put);
287    // No flush yet so init numbers should still be in place.
288    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
289    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
290    region.flush(true);
291    long max = region.getMaxFlushedSeqId();
292    HBaseTestingUtility.closeRegionAndWAL(this.region);
293    assertEquals(max, region.getMaxFlushedSeqId());
294    this.region = null;
295  }
296
297  /**
298   * Test for Bug 2 of HBASE-10466.
299   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
300   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
301   * flush is skipped and only the second flush takes place. However, two flushes are required in
302   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
303   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
304   * flushes for region close."
305   * @throws IOException
306   */
307  @Test
308  public void testCloseCarryingSnapshot() throws IOException {
309    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
310    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
311    // Get some random bytes.
312    byte [] value = Bytes.toBytes(method);
313    // Make a random put against our cf.
314    Put put = new Put(value);
315    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
316    // First put something in current memstore, which will be in snapshot after flusher.prepare()
317    region.put(put);
318    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
319    storeFlushCtx.prepare();
320    // Second put something in current memstore
321    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
322    region.put(put);
323    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
324    HBaseTestingUtility.closeRegionAndWAL(region);
325    assertEquals(0, region.getMemStoreDataSize());
326    region = null;
327  }
328
329  /*
330   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
331   * See HBASE-10845
332   */
333  @Test
334  public void testMemstoreSnapshotSize() throws IOException {
335    class MyFaultyFSLog extends FaultyFSLog {
336      StoreFlushContext storeFlushCtx;
337      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
338          throws IOException {
339        super(fs, rootDir, logName, conf);
340      }
341
342      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
343        this.storeFlushCtx = storeFlushCtx;
344      }
345
346      @Override
347      public void sync(long txid) throws IOException {
348        storeFlushCtx.prepare();
349        super.sync(txid);
350      }
351    }
352
353    FileSystem fs = FileSystem.get(CONF);
354    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
355    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
356    faultyLog.init();
357    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
358        COLUMN_FAMILY_BYTES);
359
360    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
361    // Get some random bytes.
362    byte [] value = Bytes.toBytes(method);
363    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
364
365    Put put = new Put(value);
366    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
367    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
368    boolean threwIOE = false;
369    try {
370      region.put(put);
371    } catch (IOException ioe) {
372      threwIOE = true;
373    } finally {
374      assertTrue("The regionserver should have thrown an exception", threwIOE);
375    }
376    MemStoreSize mss = store.getFlushableSize();
377    assertTrue("flushable size should be zero, but it is " + mss,
378        mss.getDataSize() == 0);
379  }
380
381  /**
382   * Create a WAL outside of the usual helper in
383   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
384   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
385   * {@code fs.file.impl}.
386   * @param callingMethod a unique component for the path, probably the name of the test method.
387   */
388  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
389      Configuration conf, TableName tableName) throws IOException {
390    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
391    final Configuration walConf = new Configuration(conf);
392    FSUtils.setRootDir(walConf, logDir);
393    return new WALFactory(walConf, callingMethod)
394        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
395  }
396
397  @Test
398  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
399    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
400    FileSystem fs = FileSystem.get(CONF);
401    Path rootDir = new Path(dir + testName);
402    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
403    hLog.init();
404    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
405        COLUMN_FAMILY_BYTES);
406    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
407    assertEquals(0, region.getMemStoreDataSize());
408
409    // Put one value
410    byte [] value = Bytes.toBytes(method);
411    Put put = new Put(value);
412    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
413    region.put(put);
414    long onePutSize = region.getMemStoreDataSize();
415    assertTrue(onePutSize > 0);
416
417    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
418    doThrow(new IOException())
419       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
420    region.setCoprocessorHost(mockedCPHost);
421
422    put = new Put(value);
423    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
424    try {
425      region.put(put);
426      fail("Should have failed with IOException");
427    } catch (IOException expected) {
428    }
429    long expectedSize = onePutSize * 2;
430    assertEquals("memstoreSize should be incremented",
431        expectedSize, region.getMemStoreDataSize());
432    assertEquals("flushable size should be incremented",
433        expectedSize, store.getFlushableSize().getDataSize());
434
435    region.setCoprocessorHost(null);
436  }
437
438  /**
439   * A test case of HBASE-21041
440   * @throws Exception Exception
441   */
442  @Test
443  public void testFlushAndMemstoreSizeCounting() throws Exception {
444    byte[] family = Bytes.toBytes("family");
445    this.region = initHRegion(tableName, method, CONF, family);
446    final WALFactory wals = new WALFactory(CONF, method);
447    try {
448      for (byte[] row : HBaseTestingUtility.ROWS) {
449        Put put = new Put(row);
450        put.addColumn(family, family, row);
451        region.put(put);
452      }
453      region.flush(true);
454      // After flush, data size should be zero
455      assertEquals(0, region.getMemStoreDataSize());
456      // After flush, a new active mutable segment is created, so the heap size
457      // should equal to MutableSegment.DEEP_OVERHEAD
458      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
459      // After flush, offheap should be zero
460      assertEquals(0, region.getMemStoreOffHeapSize());
461    } finally {
462      HBaseTestingUtility.closeRegionAndWAL(this.region);
463      this.region = null;
464      wals.close();
465    }
466  }
467
468  /**
469   * Test we do not lose data if we fail a flush and then close.
470   * Part of HBase-10466.  Tests the following from the issue description:
471   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
472   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
473   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
474   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
475   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
476   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
477   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
478   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
479   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
480   * if memstoreSize is not larger than 0."
481   * @throws Exception
482   */
483  @Test
484  public void testFlushSizeAccounting() throws Exception {
485    final Configuration conf = HBaseConfiguration.create(CONF);
486    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
487    // Only retry once.
488    conf.setInt("hbase.hstore.flush.retries.number", 1);
489    final User user =
490      User.createUserForTesting(conf, method, new String[]{"foo"});
491    // Inject our faulty LocalFileSystem
492    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
493    user.runAs(new PrivilegedExceptionAction<Object>() {
494      @Override
495      public Object run() throws Exception {
496        // Make sure it worked (above is sensitive to caching details in hadoop core)
497        FileSystem fs = FileSystem.get(conf);
498        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
499        FaultyFileSystem ffs = (FaultyFileSystem)fs;
500        HRegion region = null;
501        try {
502          // Initialize region
503          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
504              COLUMN_FAMILY_BYTES);
505          long size = region.getMemStoreDataSize();
506          Assert.assertEquals(0, size);
507          // Put one item into memstore.  Measure the size of one item in memstore.
508          Put p1 = new Put(row);
509          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
510          region.put(p1);
511          final long sizeOfOnePut = region.getMemStoreDataSize();
512          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
513          try {
514            LOG.info("Flushing");
515            region.flush(true);
516            Assert.fail("Didn't bubble up IOE!");
517          } catch (DroppedSnapshotException dse) {
518            // What we are expecting
519            region.closing.set(false); // this is needed for the rest of the test to work
520          }
521          // Make it so all writes succeed from here on out
522          ffs.fault.set(false);
523          // Check sizes.  Should still be the one entry.
524          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
525          // Now add two entries so that on this next flush that fails, we can see if we
526          // subtract the right amount, the snapshot size only.
527          Put p2 = new Put(row);
528          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
529          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
530          region.put(p2);
531          long expectedSize = sizeOfOnePut * 3;
532          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
533          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
534          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
535          // it
536          region.flush(true);
537          // Make sure our memory accounting is right.
538          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
539        } finally {
540          HBaseTestingUtility.closeRegionAndWAL(region);
541        }
542        return null;
543      }
544    });
545    FileSystem.closeAllForUGI(user.getUGI());
546  }
547
548  @Test
549  public void testCloseWithFailingFlush() throws Exception {
550    final Configuration conf = HBaseConfiguration.create(CONF);
551    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
552    // Only retry once.
553    conf.setInt("hbase.hstore.flush.retries.number", 1);
554    final User user =
555      User.createUserForTesting(conf, this.method, new String[]{"foo"});
556    // Inject our faulty LocalFileSystem
557    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
558    user.runAs(new PrivilegedExceptionAction<Object>() {
559      @Override
560      public Object run() throws Exception {
561        // Make sure it worked (above is sensitive to caching details in hadoop core)
562        FileSystem fs = FileSystem.get(conf);
563        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
564        FaultyFileSystem ffs = (FaultyFileSystem)fs;
565        HRegion region = null;
566        try {
567          // Initialize region
568          region = initHRegion(tableName, null, null, false,
569              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
570          long size = region.getMemStoreDataSize();
571          Assert.assertEquals(0, size);
572          // Put one item into memstore.  Measure the size of one item in memstore.
573          Put p1 = new Put(row);
574          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
575          region.put(p1);
576          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
577          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
578          StoreFlushContext storeFlushCtx =
579              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
580          storeFlushCtx.prepare();
581          // Now add two entries to the foreground memstore.
582          Put p2 = new Put(row);
583          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
584          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
585          region.put(p2);
586          // Now try close on top of a failing flush.
587          HBaseTestingUtility.closeRegionAndWAL(region);
588          region = null;
589          fail();
590        } catch (DroppedSnapshotException dse) {
591          // Expected
592          LOG.info("Expected DroppedSnapshotException");
593        } finally {
594          // Make it so all writes succeed from here on out so can close clean
595          ffs.fault.set(false);
596          HBaseTestingUtility.closeRegionAndWAL(region);
597        }
598        return null;
599      }
600    });
601    FileSystem.closeAllForUGI(user.getUGI());
602  }
603
604  @Test
605  public void testCompactionAffectedByScanners() throws Exception {
606    byte[] family = Bytes.toBytes("family");
607    this.region = initHRegion(tableName, method, CONF, family);
608
609    Put put = new Put(Bytes.toBytes("r1"));
610    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
611    region.put(put);
612    region.flush(true);
613
614    Scan scan = new Scan();
615    scan.setMaxVersions(3);
616    // open the first scanner
617    RegionScanner scanner1 = region.getScanner(scan);
618
619    Delete delete = new Delete(Bytes.toBytes("r1"));
620    region.delete(delete);
621    region.flush(true);
622
623    // open the second scanner
624    RegionScanner scanner2 = region.getScanner(scan);
625
626    List<Cell> results = new ArrayList<>();
627
628    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
629
630    // make a major compaction
631    region.compact(true);
632
633    // open the third scanner
634    RegionScanner scanner3 = region.getScanner(scan);
635
636    // get data from scanner 1, 2, 3 after major compaction
637    scanner1.next(results);
638    System.out.println(results);
639    assertEquals(1, results.size());
640
641    results.clear();
642    scanner2.next(results);
643    System.out.println(results);
644    assertEquals(0, results.size());
645
646    results.clear();
647    scanner3.next(results);
648    System.out.println(results);
649    assertEquals(0, results.size());
650  }
651
652  @Test
653  public void testToShowNPEOnRegionScannerReseek() throws Exception {
654    byte[] family = Bytes.toBytes("family");
655    this.region = initHRegion(tableName, method, CONF, family);
656
657    Put put = new Put(Bytes.toBytes("r1"));
658    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
659    region.put(put);
660    put = new Put(Bytes.toBytes("r2"));
661    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
662    region.put(put);
663    region.flush(true);
664
665    Scan scan = new Scan();
666    scan.setMaxVersions(3);
667    // open the first scanner
668    RegionScanner scanner1 = region.getScanner(scan);
669
670    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
671
672    region.compact(true);
673
674    scanner1.reseek(Bytes.toBytes("r2"));
675    List<Cell> results = new ArrayList<>();
676    scanner1.next(results);
677    Cell keyValue = results.get(0);
678    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
679    scanner1.close();
680  }
681
682  @Test
683  public void testArchiveRecoveredEditsReplay() throws Exception {
684    byte[] family = Bytes.toBytes("family");
685    this.region = initHRegion(tableName, method, CONF, family);
686    final WALFactory wals = new WALFactory(CONF, method);
687    try {
688      Path regiondir = region.getRegionFileSystem().getRegionDir();
689      FileSystem fs = region.getRegionFileSystem().getFileSystem();
690      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
691
692      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
693
694      long maxSeqId = 1050;
695      long minSeqId = 1000;
696
697      for (long i = minSeqId; i <= maxSeqId; i += 10) {
698        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
699        fs.create(recoveredEdits);
700        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
701
702        long time = System.nanoTime();
703        WALEdit edit = new WALEdit();
704        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
705          .toBytes(i)));
706        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
707          HConstants.DEFAULT_CLUSTER_ID), edit));
708
709        writer.close();
710      }
711      MonitoredTask status = TaskMonitor.get().createStatus(method);
712      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
713      for (HStore store : region.getStores()) {
714        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
715      }
716      CONF.set("hbase.region.archive.recovered.edits", "true");
717      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
718      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
719      assertEquals(maxSeqId, seqId);
720      region.getMVCC().advanceTo(seqId);
721      String fakeFamilyName = recoveredEditsDir.getName();
722      Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
723      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
724        region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
725      FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
726      assertEquals(6, list.length);
727    } finally {
728      CONF.set("hbase.region.archive.recovered.edits", "false");
729      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
730      HBaseTestingUtility.closeRegionAndWAL(this.region);
731      this.region = null;
732      wals.close();
733    }
734  }
735
736  @Test
737  public void testSkipRecoveredEditsReplay() throws Exception {
738    byte[] family = Bytes.toBytes("family");
739    this.region = initHRegion(tableName, method, CONF, family);
740    final WALFactory wals = new WALFactory(CONF, method);
741    try {
742      Path regiondir = region.getRegionFileSystem().getRegionDir();
743      FileSystem fs = region.getRegionFileSystem().getFileSystem();
744      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
745
746      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
747
748      long maxSeqId = 1050;
749      long minSeqId = 1000;
750
751      for (long i = minSeqId; i <= maxSeqId; i += 10) {
752        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
753        fs.create(recoveredEdits);
754        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
755
756        long time = System.nanoTime();
757        WALEdit edit = new WALEdit();
758        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
759            .toBytes(i)));
760        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
761            HConstants.DEFAULT_CLUSTER_ID), edit));
762
763        writer.close();
764      }
765      MonitoredTask status = TaskMonitor.get().createStatus(method);
766      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
767      for (HStore store : region.getStores()) {
768        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
769      }
770      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
771      assertEquals(maxSeqId, seqId);
772      region.getMVCC().advanceTo(seqId);
773      Get get = new Get(row);
774      Result result = region.get(get);
775      for (long i = minSeqId; i <= maxSeqId; i += 10) {
776        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
777        assertEquals(1, kvs.size());
778        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
779      }
780    } finally {
781      HBaseTestingUtility.closeRegionAndWAL(this.region);
782      this.region = null;
783      wals.close();
784    }
785  }
786
787  @Test
788  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
789    byte[] family = Bytes.toBytes("family");
790    this.region = initHRegion(tableName, method, CONF, family);
791    final WALFactory wals = new WALFactory(CONF, method);
792    try {
793      Path regiondir = region.getRegionFileSystem().getRegionDir();
794      FileSystem fs = region.getRegionFileSystem().getFileSystem();
795      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
796
797      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
798
799      long maxSeqId = 1050;
800      long minSeqId = 1000;
801
802      for (long i = minSeqId; i <= maxSeqId; i += 10) {
803        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
804        fs.create(recoveredEdits);
805        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
806
807        long time = System.nanoTime();
808        WALEdit edit = new WALEdit();
809        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
810            .toBytes(i)));
811        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
812            HConstants.DEFAULT_CLUSTER_ID), edit));
813
814        writer.close();
815      }
816      long recoverSeqId = 1030;
817      MonitoredTask status = TaskMonitor.get().createStatus(method);
818      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
819      for (HStore store : region.getStores()) {
820        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
821      }
822      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
823      assertEquals(maxSeqId, seqId);
824      region.getMVCC().advanceTo(seqId);
825      Get get = new Get(row);
826      Result result = region.get(get);
827      for (long i = minSeqId; i <= maxSeqId; i += 10) {
828        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
829        if (i < recoverSeqId) {
830          assertEquals(0, kvs.size());
831        } else {
832          assertEquals(1, kvs.size());
833          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
834        }
835      }
836    } finally {
837      HBaseTestingUtility.closeRegionAndWAL(this.region);
838      this.region = null;
839      wals.close();
840    }
841  }
842
843  @Test
844  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
845    byte[] family = Bytes.toBytes("family");
846    this.region = initHRegion(tableName, method, CONF, family);
847    Path regiondir = region.getRegionFileSystem().getRegionDir();
848    FileSystem fs = region.getRegionFileSystem().getFileSystem();
849
850    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
851    for (int i = 1000; i < 1050; i += 10) {
852      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
853      FSDataOutputStream dos = fs.create(recoveredEdits);
854      dos.writeInt(i);
855      dos.close();
856    }
857    long minSeqId = 2000;
858    Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
859    FSDataOutputStream dos = fs.create(recoveredEdits);
860    dos.close();
861
862    Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
863    for (HStore store : region.getStores()) {
864      maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
865    }
866    long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
867    assertEquals(minSeqId, seqId);
868  }
869
870  @Test
871  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
872    byte[] family = Bytes.toBytes("family");
873    this.region = initHRegion(tableName, method, CONF, family);
874    final WALFactory wals = new WALFactory(CONF, method);
875    try {
876      Path regiondir = region.getRegionFileSystem().getRegionDir();
877      FileSystem fs = region.getRegionFileSystem().getFileSystem();
878      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
879      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
880
881      assertEquals(0, region.getStoreFileList(columns).size());
882
883      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
884
885      long maxSeqId = 1050;
886      long minSeqId = 1000;
887
888      for (long i = minSeqId; i <= maxSeqId; i += 10) {
889        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
890        fs.create(recoveredEdits);
891        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
892
893        long time = System.nanoTime();
894        WALEdit edit = null;
895        if (i == maxSeqId) {
896          edit = WALEdit.createCompaction(region.getRegionInfo(),
897          CompactionDescriptor.newBuilder()
898          .setTableName(ByteString.copyFrom(tableName.getName()))
899          .setFamilyName(ByteString.copyFrom(regionName))
900          .setEncodedRegionName(ByteString.copyFrom(regionName))
901          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
902          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
903          .build());
904        } else {
905          edit = new WALEdit();
906          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
907            .toBytes(i)));
908        }
909        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
910            HConstants.DEFAULT_CLUSTER_ID), edit));
911        writer.close();
912      }
913
914      long recoverSeqId = 1030;
915      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
916      MonitoredTask status = TaskMonitor.get().createStatus(method);
917      for (HStore store : region.getStores()) {
918        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
919      }
920      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
921      assertEquals(maxSeqId, seqId);
922
923      // assert that the files are flushed
924      assertEquals(1, region.getStoreFileList(columns).size());
925
926    } finally {
927      HBaseTestingUtility.closeRegionAndWAL(this.region);
928      this.region = null;
929      wals.close();
930    }
931  }
932
933  @Test
934  public void testRecoveredEditsReplayCompaction() throws Exception {
935    testRecoveredEditsReplayCompaction(false);
936    testRecoveredEditsReplayCompaction(true);
937  }
938
939  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
940    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
941    byte[] family = Bytes.toBytes("family");
942    this.region = initHRegion(tableName, method, CONF, family);
943    final WALFactory wals = new WALFactory(CONF, method);
944    try {
945      Path regiondir = region.getRegionFileSystem().getRegionDir();
946      FileSystem fs = region.getRegionFileSystem().getFileSystem();
947      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
948
949      long maxSeqId = 3;
950      long minSeqId = 0;
951
952      for (long i = minSeqId; i < maxSeqId; i++) {
953        Put put = new Put(Bytes.toBytes(i));
954        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
955        region.put(put);
956        region.flush(true);
957      }
958
959      // this will create a region with 3 files
960      assertEquals(3, region.getStore(family).getStorefilesCount());
961      List<Path> storeFiles = new ArrayList<>(3);
962      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
963        storeFiles.add(sf.getPath());
964      }
965
966      // disable compaction completion
967      CONF.setBoolean("hbase.hstore.compaction.complete", false);
968      region.compactStores();
969
970      // ensure that nothing changed
971      assertEquals(3, region.getStore(family).getStorefilesCount());
972
973      // now find the compacted file, and manually add it to the recovered edits
974      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
975      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
976      String errorMsg = "Expected to find 1 file in the region temp directory "
977          + "from the compaction, could not find any";
978      assertNotNull(errorMsg, files);
979      assertEquals(errorMsg, 1, files.length);
980      // move the file inside region dir
981      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
982          files[0].getPath());
983
984      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
985      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
986      for (int i=0; i < encodedNameAsBytes.length; i++) {
987        // Mix the byte array to have a new encodedName
988        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
989      }
990
991      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
992        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
993            storeFiles, Lists.newArrayList(newFile),
994            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
995
996      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
997          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
998
999      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1000
1001      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1002      fs.create(recoveredEdits);
1003      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1004
1005      long time = System.nanoTime();
1006
1007      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
1008          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
1009          compactionDescriptor)));
1010      writer.close();
1011
1012      // close the region now, and reopen again
1013      region.getTableDescriptor();
1014      region.getRegionInfo();
1015      HBaseTestingUtility.closeRegionAndWAL(this.region);
1016      try {
1017        region = HRegion.openHRegion(region, null);
1018      } catch (WrongRegionException wre) {
1019        fail("Matching encoded region name should not have produced WrongRegionException");
1020      }
1021
1022      // now check whether we have only one store file, the compacted one
1023      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
1024      for (HStoreFile sf : sfs) {
1025        LOG.info(Objects.toString(sf.getPath()));
1026      }
1027      if (!mismatchedRegionName) {
1028        assertEquals(1, region.getStore(family).getStorefilesCount());
1029      }
1030      files = FSUtils.listStatus(fs, tmpDir);
1031      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
1032
1033      for (long i = minSeqId; i < maxSeqId; i++) {
1034        Get get = new Get(Bytes.toBytes(i));
1035        Result result = region.get(get);
1036        byte[] value = result.getValue(family, Bytes.toBytes(i));
1037        assertArrayEquals(Bytes.toBytes(i), value);
1038      }
1039    } finally {
1040      HBaseTestingUtility.closeRegionAndWAL(this.region);
1041      this.region = null;
1042      wals.close();
1043      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
1044    }
1045  }
1046
1047  @Test
1048  public void testFlushMarkers() throws Exception {
1049    // tests that flush markers are written to WAL and handled at recovered edits
1050    byte[] family = Bytes.toBytes("family");
1051    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1052    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1053    FSUtils.setRootDir(walConf, logDir);
1054    final WALFactory wals = new WALFactory(walConf, method);
1055    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1056
1057    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1058      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1059    try {
1060      Path regiondir = region.getRegionFileSystem().getRegionDir();
1061      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1062      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1063
1064      long maxSeqId = 3;
1065      long minSeqId = 0;
1066
1067      for (long i = minSeqId; i < maxSeqId; i++) {
1068        Put put = new Put(Bytes.toBytes(i));
1069        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1070        region.put(put);
1071        region.flush(true);
1072      }
1073
1074      // this will create a region with 3 files from flush
1075      assertEquals(3, region.getStore(family).getStorefilesCount());
1076      List<String> storeFiles = new ArrayList<>(3);
1077      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1078        storeFiles.add(sf.getPath().getName());
1079      }
1080
1081      // now verify that the flush markers are written
1082      wal.shutdown();
1083      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1084        TEST_UTIL.getConfiguration());
1085      try {
1086        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1087        long lastFlushSeqId = -1;
1088        while (true) {
1089          WAL.Entry entry = reader.next();
1090          if (entry == null) {
1091            break;
1092          }
1093          Cell cell = entry.getEdit().getCells().get(0);
1094          if (WALEdit.isMetaEditFamily(cell)) {
1095            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1096            assertNotNull(flushDesc);
1097            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1098            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1099              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1100            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1101              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1102            }
1103            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1104            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1105            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1106            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1107            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1108            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1109            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1110              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1111            } else {
1112              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1113              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1114            }
1115
1116            flushDescriptors.add(entry);
1117          }
1118        }
1119
1120        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1121
1122        // now write those markers to the recovered edits again.
1123
1124        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1125
1126        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1127        fs.create(recoveredEdits);
1128        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1129
1130        for (WAL.Entry entry : flushDescriptors) {
1131          writer.append(entry);
1132        }
1133        writer.close();
1134      } finally {
1135        if (null != reader) {
1136          try {
1137            reader.close();
1138          } catch (IOException exception) {
1139            LOG.warn("Problem closing wal: " + exception.getMessage());
1140            LOG.debug("exception details", exception);
1141          }
1142        }
1143      }
1144
1145      // close the region now, and reopen again
1146      HBaseTestingUtility.closeRegionAndWAL(this.region);
1147      region = HRegion.openHRegion(region, null);
1148
1149      // now check whether we have can read back the data from region
1150      for (long i = minSeqId; i < maxSeqId; i++) {
1151        Get get = new Get(Bytes.toBytes(i));
1152        Result result = region.get(get);
1153        byte[] value = result.getValue(family, Bytes.toBytes(i));
1154        assertArrayEquals(Bytes.toBytes(i), value);
1155      }
1156    } finally {
1157      HBaseTestingUtility.closeRegionAndWAL(this.region);
1158      this.region = null;
1159      wals.close();
1160    }
1161  }
1162
1163  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1164    volatile FlushAction[] actions;
1165    public IsFlushWALMarker(FlushAction... actions) {
1166      this.actions = actions;
1167    }
1168    @Override
1169    public boolean matches(WALEdit edit) {
1170      List<Cell> cells = edit.getCells();
1171      if (cells.isEmpty()) {
1172        return false;
1173      }
1174      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1175        FlushDescriptor desc;
1176        try {
1177          desc = WALEdit.getFlushDescriptor(cells.get(0));
1178        } catch (IOException e) {
1179          LOG.warn(e.toString(), e);
1180          return false;
1181        }
1182        if (desc != null) {
1183          for (FlushAction action : actions) {
1184            if (desc.getAction() == action) {
1185              return true;
1186            }
1187          }
1188        }
1189      }
1190      return false;
1191    }
1192    public IsFlushWALMarker set(FlushAction... actions) {
1193      this.actions = actions;
1194      return this;
1195    }
1196  }
1197
1198  @Test
1199  public void testFlushMarkersWALFail() throws Exception {
1200    // test the cases where the WAL append for flush markers fail.
1201    byte[] family = Bytes.toBytes("family");
1202
1203    // spy an actual WAL implementation to throw exception (was not able to mock)
1204    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1205
1206    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1207    FSUtils.setRootDir(walConf, logDir);
1208    // Make up a WAL that we can manipulate at append time.
1209    class FailAppendFlushMarkerWAL extends FSHLog {
1210      volatile FlushAction [] flushActions = null;
1211
1212      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1213      throws IOException {
1214        super(fs, root, logDir, conf);
1215      }
1216
1217      @Override
1218      protected Writer createWriterInstance(Path path) throws IOException {
1219        final Writer w = super.createWriterInstance(path);
1220        return new Writer() {
1221          @Override
1222          public void close() throws IOException {
1223            w.close();
1224          }
1225
1226          @Override
1227          public void sync(boolean forceSync) throws IOException {
1228            w.sync(forceSync);
1229          }
1230
1231          @Override
1232          public void append(Entry entry) throws IOException {
1233            List<Cell> cells = entry.getEdit().getCells();
1234            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1235              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1236              if (desc != null) {
1237                for (FlushAction flushAction: flushActions) {
1238                  if (desc.getAction().equals(flushAction)) {
1239                    throw new IOException("Failed to append flush marker! " + flushAction);
1240                  }
1241                }
1242              }
1243            }
1244            w.append(entry);
1245          }
1246
1247          @Override
1248          public long getLength() {
1249            return w.getLength();
1250          }
1251        };
1252      }
1253    }
1254    FailAppendFlushMarkerWAL wal =
1255      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1256        method, walConf);
1257    wal.init();
1258    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1259      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1260    int i = 0;
1261    Put put = new Put(Bytes.toBytes(i));
1262    put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1263    put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1264    region.put(put);
1265
1266    // 1. Test case where START_FLUSH throws exception
1267    wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1268
1269    // start cache flush will throw exception
1270    try {
1271      region.flush(true);
1272      fail("This should have thrown exception");
1273    } catch (DroppedSnapshotException unexpected) {
1274      // this should not be a dropped snapshot exception. Meaning that RS will not abort
1275      throw unexpected;
1276    } catch (IOException expected) {
1277      // expected
1278    }
1279    // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1280    // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1281    region.close(true);
1282    wal.close();
1283
1284    // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1285    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1286    wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1287          method, walConf);
1288    wal.init();
1289    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1290      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1291    region.put(put);
1292    // 3. Test case where ABORT_FLUSH will throw exception.
1293    // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1294    // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
1295    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1296
1297    try {
1298      region.flush(true);
1299      fail("This should have thrown exception");
1300    } catch (DroppedSnapshotException expected) {
1301      // we expect this exception, since we were able to write the snapshot, but failed to
1302      // write the flush marker to WAL
1303    } catch (IOException unexpected) {
1304      throw unexpected;
1305    }
1306  }
1307
1308  @Test
1309  public void testGetWhileRegionClose() throws IOException {
1310    Configuration hc = initSplit();
1311    int numRows = 100;
1312    byte[][] families = { fam1, fam2, fam3 };
1313
1314    // Setting up region
1315    this.region = initHRegion(tableName, method, hc, families);
1316    // Put data in region
1317    final int startRow = 100;
1318    putData(startRow, numRows, qual1, families);
1319    putData(startRow, numRows, qual2, families);
1320    putData(startRow, numRows, qual3, families);
1321    final AtomicBoolean done = new AtomicBoolean(false);
1322    final AtomicInteger gets = new AtomicInteger(0);
1323    GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1324    try {
1325      // Set ten threads running concurrently getting from the region.
1326      for (int i = 0; i < threads.length / 2; i++) {
1327        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1328        threads[i].setDaemon(true);
1329        threads[i].start();
1330      }
1331      // Artificially make the condition by setting closing flag explicitly.
1332      // I can't make the issue happen with a call to region.close().
1333      this.region.closing.set(true);
1334      for (int i = threads.length / 2; i < threads.length; i++) {
1335        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1336        threads[i].setDaemon(true);
1337        threads[i].start();
1338      }
1339    } finally {
1340      if (this.region != null) {
1341        HBaseTestingUtility.closeRegionAndWAL(this.region);
1342        this.region = null;
1343      }
1344    }
1345    done.set(true);
1346    for (GetTillDoneOrException t : threads) {
1347      try {
1348        t.join();
1349      } catch (InterruptedException e) {
1350        e.printStackTrace();
1351      }
1352      if (t.e != null) {
1353        LOG.info("Exception=" + t.e);
1354        assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1355      }
1356    }
1357  }
1358
1359  /*
1360   * Thread that does get on single row until 'done' flag is flipped. If an
1361   * exception causes us to fail, it records it.
1362   */
1363  class GetTillDoneOrException extends Thread {
1364    private final Get g;
1365    private final AtomicBoolean done;
1366    private final AtomicInteger count;
1367    private Exception e;
1368
1369    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1370        final AtomicInteger c) {
1371      super("getter." + i);
1372      this.g = new Get(r);
1373      this.done = d;
1374      this.count = c;
1375    }
1376
1377    @Override
1378    public void run() {
1379      while (!this.done.get()) {
1380        try {
1381          assertTrue(region.get(g).size() > 0);
1382          this.count.incrementAndGet();
1383        } catch (Exception e) {
1384          this.e = e;
1385          break;
1386        }
1387      }
1388    }
1389  }
1390
1391  /*
1392   * An involved filter test. Has multiple column families and deletes in mix.
1393   */
1394  @Test
1395  public void testWeirdCacheBehaviour() throws Exception {
1396    final TableName tableName = TableName.valueOf(name.getMethodName());
1397    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1398        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1399    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1400    String value = "this is the value";
1401    String value2 = "this is some other value";
1402    String keyPrefix1 = "prefix1";
1403    String keyPrefix2 = "prefix2";
1404    String keyPrefix3 = "prefix3";
1405    putRows(this.region, 3, value, keyPrefix1);
1406    putRows(this.region, 3, value, keyPrefix2);
1407    putRows(this.region, 3, value, keyPrefix3);
1408    putRows(this.region, 3, value2, keyPrefix1);
1409    putRows(this.region, 3, value2, keyPrefix2);
1410    putRows(this.region, 3, value2, keyPrefix3);
1411    System.out.println("Checking values for key: " + keyPrefix1);
1412    assertEquals("Got back incorrect number of rows from scan", 3,
1413        getNumberOfRows(keyPrefix1, value2, this.region));
1414    System.out.println("Checking values for key: " + keyPrefix2);
1415    assertEquals("Got back incorrect number of rows from scan", 3,
1416        getNumberOfRows(keyPrefix2, value2, this.region));
1417    System.out.println("Checking values for key: " + keyPrefix3);
1418    assertEquals("Got back incorrect number of rows from scan", 3,
1419        getNumberOfRows(keyPrefix3, value2, this.region));
1420    deleteColumns(this.region, value2, keyPrefix1);
1421    deleteColumns(this.region, value2, keyPrefix2);
1422    deleteColumns(this.region, value2, keyPrefix3);
1423    System.out.println("Starting important checks.....");
1424    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1425        getNumberOfRows(keyPrefix1, value2, this.region));
1426    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1427        getNumberOfRows(keyPrefix2, value2, this.region));
1428    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1429        getNumberOfRows(keyPrefix3, value2, this.region));
1430  }
1431
1432  @Test
1433  public void testAppendWithReadOnlyTable() throws Exception {
1434    final TableName tableName = TableName.valueOf(name.getMethodName());
1435    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1436    boolean exceptionCaught = false;
1437    Append append = new Append(Bytes.toBytes("somerow"));
1438    append.setDurability(Durability.SKIP_WAL);
1439    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1440        Bytes.toBytes("somevalue"));
1441    try {
1442      region.append(append);
1443    } catch (IOException e) {
1444      exceptionCaught = true;
1445    }
1446    assertTrue(exceptionCaught == true);
1447  }
1448
1449  @Test
1450  public void testIncrWithReadOnlyTable() throws Exception {
1451    final TableName tableName = TableName.valueOf(name.getMethodName());
1452    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1453    boolean exceptionCaught = false;
1454    Increment inc = new Increment(Bytes.toBytes("somerow"));
1455    inc.setDurability(Durability.SKIP_WAL);
1456    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1457    try {
1458      region.increment(inc);
1459    } catch (IOException e) {
1460      exceptionCaught = true;
1461    }
1462    assertTrue(exceptionCaught == true);
1463  }
1464
1465  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1466    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1467    int count = 0;
1468    boolean more = false;
1469    List<Cell> results = new ArrayList<>();
1470    do {
1471      more = scanner.next(results);
1472      if (results != null && !results.isEmpty())
1473        count++;
1474      else
1475        break;
1476      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1477      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1478      r.delete(delete);
1479      results.clear();
1480    } while (more);
1481    assertEquals("Did not perform correct number of deletes", 3, count);
1482  }
1483
1484  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1485    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1486    int numberOfResults = 0;
1487    List<Cell> results = new ArrayList<>();
1488    boolean more = false;
1489    do {
1490      more = resultScanner.next(results);
1491      if (results != null && !results.isEmpty())
1492        numberOfResults++;
1493      else
1494        break;
1495      for (Cell kv : results) {
1496        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1497      }
1498      results.clear();
1499    } while (more);
1500    return numberOfResults;
1501  }
1502
1503  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1504      throws IOException {
1505    // Defaults FilterList.Operator.MUST_PASS_ALL.
1506    FilterList allFilters = new FilterList();
1507    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1508    // Only return rows where this column value exists in the row.
1509    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1510        Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes.toBytes(value));
1511    filter.setFilterIfMissing(true);
1512    allFilters.addFilter(filter);
1513    Scan scan = new Scan();
1514    scan.addFamily(Bytes.toBytes("trans-blob"));
1515    scan.addFamily(Bytes.toBytes("trans-type"));
1516    scan.addFamily(Bytes.toBytes("trans-date"));
1517    scan.addFamily(Bytes.toBytes("trans-tags"));
1518    scan.addFamily(Bytes.toBytes("trans-group"));
1519    scan.setFilter(allFilters);
1520    return r.getScanner(scan);
1521  }
1522
1523  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1524    for (int i = 0; i < numRows; i++) {
1525      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1526      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1527      Put put = new Put(Bytes.toBytes(row));
1528      put.setDurability(Durability.SKIP_WAL);
1529      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1530      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1531      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1532      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1533      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1534      r.put(put);
1535    }
1536  }
1537
1538  @Test
1539  public void testFamilyWithAndWithoutColon() throws Exception {
1540    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1541    this.region = initHRegion(tableName, method, CONF, cf);
1542    Put p = new Put(tableName.toBytes());
1543    byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1544    p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1545    boolean exception = false;
1546    try {
1547      this.region.put(p);
1548    } catch (NoSuchColumnFamilyException e) {
1549      exception = true;
1550    }
1551    assertTrue(exception);
1552  }
1553
1554  @Test
1555  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1556    final Put[] puts = new Put[10];
1557    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1558    long syncs = prepareRegionForBachPut(puts, source, false);
1559
1560    OperationStatus[] codes = this.region.batchMutate(puts);
1561    assertEquals(10, codes.length);
1562    for (int i = 0; i < 10; i++) {
1563      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1564    }
1565    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1566
1567    LOG.info("Next a batch put with one invalid family");
1568    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1569    codes = this.region.batchMutate(puts);
1570    assertEquals(10, codes.length);
1571    for (int i = 0; i < 10; i++) {
1572      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1573          codes[i].getOperationStatusCode());
1574    }
1575
1576    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1577  }
1578
1579  @Test
1580  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1581    final Put[] puts = new Put[10];
1582    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1583    long syncs = prepareRegionForBachPut(puts, source, false);
1584
1585    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1586
1587    LOG.info("batchPut will have to break into four batches to avoid row locks");
1588    RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1589    RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1590    RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1591    RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1592
1593    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1594    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1595    final CountDownLatch startingPuts = new CountDownLatch(1);
1596    final CountDownLatch startingClose = new CountDownLatch(1);
1597    TestThread putter = new TestThread(ctx) {
1598      @Override
1599      public void doWork() throws IOException {
1600        startingPuts.countDown();
1601        retFromThread.set(region.batchMutate(puts));
1602      }
1603    };
1604    LOG.info("...starting put thread while holding locks");
1605    ctx.addThread(putter);
1606    ctx.startThreads();
1607
1608    // Now attempt to close the region from another thread.  Prior to HBASE-12565
1609    // this would cause the in-progress batchMutate operation to to fail with
1610    // exception because it use to release and re-acquire the close-guard lock
1611    // between batches.  Caller then didn't get status indicating which writes succeeded.
1612    // We now expect this thread to block until the batchMutate call finishes.
1613    Thread regionCloseThread = new TestThread(ctx) {
1614      @Override
1615      public void doWork() {
1616        try {
1617          startingPuts.await();
1618          // Give some time for the batch mutate to get in.
1619          // We don't want to race with the mutate
1620          Thread.sleep(10);
1621          startingClose.countDown();
1622          HBaseTestingUtility.closeRegionAndWAL(region);
1623          region = null;
1624        } catch (IOException e) {
1625          throw new RuntimeException(e);
1626        } catch (InterruptedException e) {
1627          throw new RuntimeException(e);
1628        }
1629      }
1630    };
1631    regionCloseThread.start();
1632
1633    startingClose.await();
1634    startingPuts.await();
1635    Thread.sleep(100);
1636    LOG.info("...releasing row lock 1, which should let put thread continue");
1637    rowLock1.release();
1638    rowLock2.release();
1639    rowLock3.release();
1640    waitForCounter(source, "syncTimeNumOps", syncs + 1);
1641
1642    LOG.info("...joining on put thread");
1643    ctx.stop();
1644    regionCloseThread.join();
1645
1646    OperationStatus[] codes = retFromThread.get();
1647    for (int i = 0; i < codes.length; i++) {
1648      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1649          codes[i].getOperationStatusCode());
1650    }
1651    rowLock4.release();
1652  }
1653
1654  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1655      throws InterruptedException {
1656    long startWait = System.currentTimeMillis();
1657    long currentCount;
1658    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1659      Thread.sleep(100);
1660      if (System.currentTimeMillis() - startWait > 10000) {
1661        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1662            expectedCount, currentCount));
1663      }
1664    }
1665  }
1666
1667  @Test
1668  public void testAtomicBatchPut() throws IOException {
1669    final Put[] puts = new Put[10];
1670    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1671    long syncs = prepareRegionForBachPut(puts, source, false);
1672
1673    // 1. Straight forward case, should succeed
1674    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1675        HConstants.NO_NONCE, HConstants.NO_NONCE);
1676    OperationStatus[] codes = this.region.batchMutate(batchOp);
1677    assertEquals(10, codes.length);
1678    for (int i = 0; i < 10; i++) {
1679      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1680    }
1681    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1682
1683    // 2. Failed to get lock
1684    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1685    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1686    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1687    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1688    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1689    final CountDownLatch finishedPuts = new CountDownLatch(1);
1690    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1691        HConstants
1692        .NO_NONCE,
1693        HConstants.NO_NONCE);
1694    TestThread putter = new TestThread(ctx) {
1695      @Override
1696      public void doWork() throws IOException {
1697        try {
1698          region.batchMutate(finalBatchOp);
1699        } catch (IOException ioe) {
1700          LOG.error("test failed!", ioe);
1701          retFromThread.set(ioe);
1702        }
1703        finishedPuts.countDown();
1704      }
1705    };
1706    LOG.info("...starting put thread while holding locks");
1707    ctx.addThread(putter);
1708    ctx.startThreads();
1709    LOG.info("...waiting for batch puts while holding locks");
1710    try {
1711      finishedPuts.await();
1712    } catch (InterruptedException e) {
1713      LOG.error("Interrupted!", e);
1714    } finally {
1715      if (lock != null) {
1716        lock.release();
1717      }
1718    }
1719    assertNotNull(retFromThread.get());
1720    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1721
1722    // 3. Exception thrown in validation
1723    LOG.info("Next a batch put with one invalid family");
1724    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1725    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1726        HConstants.NO_NONCE);
1727    thrown.expect(NoSuchColumnFamilyException.class);
1728    this.region.batchMutate(batchOp);
1729  }
1730
1731  @Test
1732  public void testBatchPutWithTsSlop() throws Exception {
1733    // add data with a timestamp that is too recent for range. Ensure assert
1734    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1735    final Put[] puts = new Put[10];
1736    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1737
1738    long syncs = prepareRegionForBachPut(puts, source, true);
1739
1740    OperationStatus[] codes = this.region.batchMutate(puts);
1741    assertEquals(10, codes.length);
1742    for (int i = 0; i < 10; i++) {
1743      assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1744    }
1745    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1746  }
1747
1748  /**
1749   * @return syncs initial syncTimeNumOps
1750   */
1751  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1752      boolean slop) throws IOException {
1753    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1754
1755    LOG.info("First a batch put with all valid puts");
1756    for (int i = 0; i < puts.length; i++) {
1757      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1758          new Put(Bytes.toBytes("row_" + i));
1759      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1760    }
1761
1762    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1763    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1764    return syncs;
1765  }
1766
1767  // ////////////////////////////////////////////////////////////////////////////
1768  // checkAndMutate tests
1769  // ////////////////////////////////////////////////////////////////////////////
1770  @Test
1771  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1772    byte[] row1 = Bytes.toBytes("row1");
1773    byte[] fam1 = Bytes.toBytes("fam1");
1774    byte[] qf1 = Bytes.toBytes("qualifier");
1775    byte[] emptyVal = new byte[] {};
1776    byte[] val1 = Bytes.toBytes("value1");
1777    byte[] val2 = Bytes.toBytes("value2");
1778
1779    // Setting up region
1780    this.region = initHRegion(tableName, method, CONF, fam1);
1781    // Putting empty data in key
1782    Put put = new Put(row1);
1783    put.addColumn(fam1, qf1, emptyVal);
1784
1785    // checkAndPut with empty value
1786    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1787        new BinaryComparator(emptyVal), put);
1788    assertTrue(res);
1789
1790    // Putting data in key
1791    put = new Put(row1);
1792    put.addColumn(fam1, qf1, val1);
1793
1794    // checkAndPut with correct value
1795    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1796        new BinaryComparator(emptyVal), put);
1797    assertTrue(res);
1798
1799    // not empty anymore
1800    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1801        new BinaryComparator(emptyVal), put);
1802    assertFalse(res);
1803
1804    Delete delete = new Delete(row1);
1805    delete.addColumn(fam1, qf1);
1806    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1807        new BinaryComparator(emptyVal), delete);
1808    assertFalse(res);
1809
1810    put = new Put(row1);
1811    put.addColumn(fam1, qf1, val2);
1812    // checkAndPut with correct value
1813    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1814        new BinaryComparator(val1), put);
1815    assertTrue(res);
1816
1817    // checkAndDelete with correct value
1818    delete = new Delete(row1);
1819    delete.addColumn(fam1, qf1);
1820    delete.addColumn(fam1, qf1);
1821    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1822        new BinaryComparator(val2), delete);
1823    assertTrue(res);
1824
1825    delete = new Delete(row1);
1826    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1827        new BinaryComparator(emptyVal), delete);
1828    assertTrue(res);
1829
1830    // checkAndPut looking for a null value
1831    put = new Put(row1);
1832    put.addColumn(fam1, qf1, val1);
1833
1834    res = region
1835        .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1836    assertTrue(res);
1837  }
1838
1839  @Test
1840  public void testCheckAndMutate_WithWrongValue() throws IOException {
1841    byte[] row1 = Bytes.toBytes("row1");
1842    byte[] fam1 = Bytes.toBytes("fam1");
1843    byte[] qf1 = Bytes.toBytes("qualifier");
1844    byte[] val1 = Bytes.toBytes("value1");
1845    byte[] val2 = Bytes.toBytes("value2");
1846    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1847    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1848
1849    // Setting up region
1850    this.region = initHRegion(tableName, method, CONF, fam1);
1851    // Putting data in key
1852    Put put = new Put(row1);
1853    put.addColumn(fam1, qf1, val1);
1854    region.put(put);
1855
1856    // checkAndPut with wrong value
1857    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1858        new BinaryComparator(val2), put);
1859    assertEquals(false, res);
1860
1861    // checkAndDelete with wrong value
1862    Delete delete = new Delete(row1);
1863    delete.addFamily(fam1);
1864    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1865        new BinaryComparator(val2), put);
1866    assertEquals(false, res);
1867
1868    // Putting data in key
1869    put = new Put(row1);
1870    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1871    region.put(put);
1872
1873    // checkAndPut with wrong value
1874    res =
1875        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1876            new BigDecimalComparator(bd2), put);
1877    assertEquals(false, res);
1878
1879    // checkAndDelete with wrong value
1880    delete = new Delete(row1);
1881    delete.addFamily(fam1);
1882    res =
1883        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1884            new BigDecimalComparator(bd2), put);
1885    assertEquals(false, res);
1886  }
1887
1888  @Test
1889  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1890    byte[] row1 = Bytes.toBytes("row1");
1891    byte[] fam1 = Bytes.toBytes("fam1");
1892    byte[] qf1 = Bytes.toBytes("qualifier");
1893    byte[] val1 = Bytes.toBytes("value1");
1894    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1895
1896    // Setting up region
1897    this.region = initHRegion(tableName, method, CONF, fam1);
1898    // Putting data in key
1899    Put put = new Put(row1);
1900    put.addColumn(fam1, qf1, val1);
1901    region.put(put);
1902
1903    // checkAndPut with correct value
1904    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1905        new BinaryComparator(val1), put);
1906    assertEquals(true, res);
1907
1908    // checkAndDelete with correct value
1909    Delete delete = new Delete(row1);
1910    delete.addColumn(fam1, qf1);
1911    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1912        delete);
1913    assertEquals(true, res);
1914
1915    // Putting data in key
1916    put = new Put(row1);
1917    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1918    region.put(put);
1919
1920    // checkAndPut with correct value
1921    res =
1922        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1923            bd1), put);
1924    assertEquals(true, res);
1925
1926    // checkAndDelete with correct value
1927    delete = new Delete(row1);
1928    delete.addColumn(fam1, qf1);
1929    res =
1930        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1931            bd1), delete);
1932    assertEquals(true, res);
1933  }
1934
1935  @Test
1936  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1937    byte[] row1 = Bytes.toBytes("row1");
1938    byte[] fam1 = Bytes.toBytes("fam1");
1939    byte[] qf1 = Bytes.toBytes("qualifier");
1940    byte[] val1 = Bytes.toBytes("value1");
1941    byte[] val2 = Bytes.toBytes("value2");
1942    byte[] val3 = Bytes.toBytes("value3");
1943    byte[] val4 = Bytes.toBytes("value4");
1944
1945    // Setting up region
1946    this.region = initHRegion(tableName, method, CONF, fam1);
1947    // Putting val3 in key
1948    Put put = new Put(row1);
1949    put.addColumn(fam1, qf1, val3);
1950    region.put(put);
1951
1952    // Test CompareOp.LESS: original = val3, compare with val3, fail
1953    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1954        new BinaryComparator(val3), put);
1955    assertEquals(false, res);
1956
1957    // Test CompareOp.LESS: original = val3, compare with val4, fail
1958    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1959        new BinaryComparator(val4), put);
1960    assertEquals(false, res);
1961
1962    // Test CompareOp.LESS: original = val3, compare with val2,
1963    // succeed (now value = val2)
1964    put = new Put(row1);
1965    put.addColumn(fam1, qf1, val2);
1966    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1967        new BinaryComparator(val2), put);
1968    assertEquals(true, res);
1969
1970    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1971    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1972        new BinaryComparator(val3), put);
1973    assertEquals(false, res);
1974
1975    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1976    // succeed (value still = val2)
1977    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1978        new BinaryComparator(val2), put);
1979    assertEquals(true, res);
1980
1981    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1982    // succeed (now value = val3)
1983    put = new Put(row1);
1984    put.addColumn(fam1, qf1, val3);
1985    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1986        new BinaryComparator(val1), put);
1987    assertEquals(true, res);
1988
1989    // Test CompareOp.GREATER: original = val3, compare with val3, fail
1990    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1991        new BinaryComparator(val3), put);
1992    assertEquals(false, res);
1993
1994    // Test CompareOp.GREATER: original = val3, compare with val2, fail
1995    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1996        new BinaryComparator(val2), put);
1997    assertEquals(false, res);
1998
1999    // Test CompareOp.GREATER: original = val3, compare with val4,
2000    // succeed (now value = val2)
2001    put = new Put(row1);
2002    put.addColumn(fam1, qf1, val2);
2003    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2004        new BinaryComparator(val4), put);
2005    assertEquals(true, res);
2006
2007    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2008    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2009        new BinaryComparator(val1), put);
2010    assertEquals(false, res);
2011
2012    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2013    // succeed (value still = val2)
2014    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2015        new BinaryComparator(val2), put);
2016    assertEquals(true, res);
2017
2018    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2019    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2020        new BinaryComparator(val3), put);
2021    assertEquals(true, res);
2022  }
2023
2024  @Test
2025  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2026    byte[] row1 = Bytes.toBytes("row1");
2027    byte[] fam1 = Bytes.toBytes("fam1");
2028    byte[] fam2 = Bytes.toBytes("fam2");
2029    byte[] qf1 = Bytes.toBytes("qualifier");
2030    byte[] val1 = Bytes.toBytes("value1");
2031    byte[] val2 = Bytes.toBytes("value2");
2032
2033    byte[][] families = { fam1, fam2 };
2034
2035    // Setting up region
2036    this.region = initHRegion(tableName, method, CONF, families);
2037    // Putting data in the key to check
2038    Put put = new Put(row1);
2039    put.addColumn(fam1, qf1, val1);
2040    region.put(put);
2041
2042    // Creating put to add
2043    long ts = System.currentTimeMillis();
2044    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2045    put = new Put(row1);
2046    put.add(kv);
2047
2048    // checkAndPut with wrong value
2049    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2050        new BinaryComparator(val1), put);
2051    assertEquals(true, res);
2052
2053    Get get = new Get(row1);
2054    get.addColumn(fam2, qf1);
2055    Cell[] actual = region.get(get).rawCells();
2056
2057    Cell[] expected = { kv };
2058
2059    assertEquals(expected.length, actual.length);
2060    for (int i = 0; i < actual.length; i++) {
2061      assertEquals(expected[i], actual[i]);
2062    }
2063  }
2064
2065  @Test
2066  public void testCheckAndPut_wrongRowInPut() throws IOException {
2067    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2068    Put put = new Put(row2);
2069    put.addColumn(fam1, qual1, value1);
2070    try {
2071      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2072          new BinaryComparator(value2), put);
2073      fail();
2074    } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2075      // expected exception.
2076    }
2077  }
2078
2079  @Test
2080  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2081    byte[] row1 = Bytes.toBytes("row1");
2082    byte[] fam1 = Bytes.toBytes("fam1");
2083    byte[] fam2 = Bytes.toBytes("fam2");
2084    byte[] qf1 = Bytes.toBytes("qualifier1");
2085    byte[] qf2 = Bytes.toBytes("qualifier2");
2086    byte[] qf3 = Bytes.toBytes("qualifier3");
2087    byte[] val1 = Bytes.toBytes("value1");
2088    byte[] val2 = Bytes.toBytes("value2");
2089    byte[] val3 = Bytes.toBytes("value3");
2090    byte[] emptyVal = new byte[] {};
2091
2092    byte[][] families = { fam1, fam2 };
2093
2094    // Setting up region
2095    this.region = initHRegion(tableName, method, CONF, families);
2096    // Put content
2097    Put put = new Put(row1);
2098    put.addColumn(fam1, qf1, val1);
2099    region.put(put);
2100    Threads.sleep(2);
2101
2102    put = new Put(row1);
2103    put.addColumn(fam1, qf1, val2);
2104    put.addColumn(fam2, qf1, val3);
2105    put.addColumn(fam2, qf2, val2);
2106    put.addColumn(fam2, qf3, val1);
2107    put.addColumn(fam1, qf3, val1);
2108    region.put(put);
2109
2110    // Multi-column delete
2111    Delete delete = new Delete(row1);
2112    delete.addColumn(fam1, qf1);
2113    delete.addColumn(fam2, qf1);
2114    delete.addColumn(fam1, qf3);
2115    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2116        new BinaryComparator(val2), delete);
2117    assertEquals(true, res);
2118
2119    Get get = new Get(row1);
2120    get.addColumn(fam1, qf1);
2121    get.addColumn(fam1, qf3);
2122    get.addColumn(fam2, qf2);
2123    Result r = region.get(get);
2124    assertEquals(2, r.size());
2125    assertArrayEquals(val1, r.getValue(fam1, qf1));
2126    assertArrayEquals(val2, r.getValue(fam2, qf2));
2127
2128    // Family delete
2129    delete = new Delete(row1);
2130    delete.addFamily(fam2);
2131    res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL,
2132        new BinaryComparator(emptyVal), delete);
2133    assertEquals(true, res);
2134
2135    get = new Get(row1);
2136    r = region.get(get);
2137    assertEquals(1, r.size());
2138    assertArrayEquals(val1, r.getValue(fam1, qf1));
2139
2140    // Row delete
2141    delete = new Delete(row1);
2142    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2143        delete);
2144    assertEquals(true, res);
2145    get = new Get(row1);
2146    r = region.get(get);
2147    assertEquals(0, r.size());
2148  }
2149
2150  // ////////////////////////////////////////////////////////////////////////////
2151  // Delete tests
2152  // ////////////////////////////////////////////////////////////////////////////
2153  @Test
2154  public void testDelete_multiDeleteColumn() throws IOException {
2155    byte[] row1 = Bytes.toBytes("row1");
2156    byte[] fam1 = Bytes.toBytes("fam1");
2157    byte[] qual = Bytes.toBytes("qualifier");
2158    byte[] value = Bytes.toBytes("value");
2159
2160    Put put = new Put(row1);
2161    put.addColumn(fam1, qual, 1, value);
2162    put.addColumn(fam1, qual, 2, value);
2163
2164    this.region = initHRegion(tableName, method, CONF, fam1);
2165    region.put(put);
2166
2167    // We do support deleting more than 1 'latest' version
2168    Delete delete = new Delete(row1);
2169    delete.addColumn(fam1, qual);
2170    delete.addColumn(fam1, qual);
2171    region.delete(delete);
2172
2173    Get get = new Get(row1);
2174    get.addFamily(fam1);
2175    Result r = region.get(get);
2176    assertEquals(0, r.size());
2177  }
2178
2179  @Test
2180  public void testDelete_CheckFamily() throws IOException {
2181    byte[] row1 = Bytes.toBytes("row1");
2182    byte[] fam1 = Bytes.toBytes("fam1");
2183    byte[] fam2 = Bytes.toBytes("fam2");
2184    byte[] fam3 = Bytes.toBytes("fam3");
2185    byte[] fam4 = Bytes.toBytes("fam4");
2186
2187    // Setting up region
2188    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2189    List<Cell> kvs = new ArrayList<>();
2190    kvs.add(new KeyValue(row1, fam4, null, null));
2191
2192    // testing existing family
2193    byte[] family = fam2;
2194    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2195    deleteMap.put(family, kvs);
2196    region.delete(deleteMap, Durability.SYNC_WAL);
2197
2198    // testing non existing family
2199    boolean ok = false;
2200    family = fam4;
2201    try {
2202      deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2203      deleteMap.put(family, kvs);
2204      region.delete(deleteMap, Durability.SYNC_WAL);
2205    } catch (Exception e) {
2206      ok = true;
2207    }
2208    assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2209  }
2210
2211  @Test
2212  public void testDelete_mixed() throws IOException, InterruptedException {
2213    byte[] fam = Bytes.toBytes("info");
2214    byte[][] families = { fam };
2215    this.region = initHRegion(tableName, method, CONF, families);
2216    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2217
2218    byte[] row = Bytes.toBytes("table_name");
2219    // column names
2220    byte[] serverinfo = Bytes.toBytes("serverinfo");
2221    byte[] splitA = Bytes.toBytes("splitA");
2222    byte[] splitB = Bytes.toBytes("splitB");
2223
2224    // add some data:
2225    Put put = new Put(row);
2226    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2227    region.put(put);
2228
2229    put = new Put(row);
2230    put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2231    region.put(put);
2232
2233    put = new Put(row);
2234    put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2235    region.put(put);
2236
2237    // ok now delete a split:
2238    Delete delete = new Delete(row);
2239    delete.addColumns(fam, splitA);
2240    region.delete(delete);
2241
2242    // assert some things:
2243    Get get = new Get(row).addColumn(fam, serverinfo);
2244    Result result = region.get(get);
2245    assertEquals(1, result.size());
2246
2247    get = new Get(row).addColumn(fam, splitA);
2248    result = region.get(get);
2249    assertEquals(0, result.size());
2250
2251    get = new Get(row).addColumn(fam, splitB);
2252    result = region.get(get);
2253    assertEquals(1, result.size());
2254
2255    // Assert that after a delete, I can put.
2256    put = new Put(row);
2257    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2258    region.put(put);
2259    get = new Get(row);
2260    result = region.get(get);
2261    assertEquals(3, result.size());
2262
2263    // Now delete all... then test I can add stuff back
2264    delete = new Delete(row);
2265    region.delete(delete);
2266    assertEquals(0, region.get(get).size());
2267
2268    region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2269    result = region.get(get);
2270    assertEquals(1, result.size());
2271  }
2272
2273  @Test
2274  public void testDeleteRowWithFutureTs() throws IOException {
2275    byte[] fam = Bytes.toBytes("info");
2276    byte[][] families = { fam };
2277    this.region = initHRegion(tableName, method, CONF, families);
2278    byte[] row = Bytes.toBytes("table_name");
2279    // column names
2280    byte[] serverinfo = Bytes.toBytes("serverinfo");
2281
2282    // add data in the far future
2283    Put put = new Put(row);
2284    put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2285    region.put(put);
2286
2287    // now delete something in the present
2288    Delete delete = new Delete(row);
2289    region.delete(delete);
2290
2291    // make sure we still see our data
2292    Get get = new Get(row).addColumn(fam, serverinfo);
2293    Result result = region.get(get);
2294    assertEquals(1, result.size());
2295
2296    // delete the future row
2297    delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2298    region.delete(delete);
2299
2300    // make sure it is gone
2301    get = new Get(row).addColumn(fam, serverinfo);
2302    result = region.get(get);
2303    assertEquals(0, result.size());
2304  }
2305
2306  /**
2307   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2308   * the actual timestamp
2309   */
2310  @Test
2311  public void testPutWithLatestTS() throws IOException {
2312    byte[] fam = Bytes.toBytes("info");
2313    byte[][] families = { fam };
2314    this.region = initHRegion(tableName, method, CONF, families);
2315    byte[] row = Bytes.toBytes("row1");
2316    // column names
2317    byte[] qual = Bytes.toBytes("qual");
2318
2319    // add data with LATEST_TIMESTAMP, put without WAL
2320    Put put = new Put(row);
2321    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2322    region.put(put);
2323
2324    // Make sure it shows up with an actual timestamp
2325    Get get = new Get(row).addColumn(fam, qual);
2326    Result result = region.get(get);
2327    assertEquals(1, result.size());
2328    Cell kv = result.rawCells()[0];
2329    LOG.info("Got: " + kv);
2330    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2331        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2332
2333    // Check same with WAL enabled (historically these took different
2334    // code paths, so check both)
2335    row = Bytes.toBytes("row2");
2336    put = new Put(row);
2337    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2338    region.put(put);
2339
2340    // Make sure it shows up with an actual timestamp
2341    get = new Get(row).addColumn(fam, qual);
2342    result = region.get(get);
2343    assertEquals(1, result.size());
2344    kv = result.rawCells()[0];
2345    LOG.info("Got: " + kv);
2346    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2347        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2348  }
2349
2350  /**
2351   * Tests that there is server-side filtering for invalid timestamp upper
2352   * bound. Note that the timestamp lower bound is automatically handled for us
2353   * by the TTL field.
2354   */
2355  @Test
2356  public void testPutWithTsSlop() throws IOException {
2357    byte[] fam = Bytes.toBytes("info");
2358    byte[][] families = { fam };
2359
2360    // add data with a timestamp that is too recent for range. Ensure assert
2361    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2362    this.region = initHRegion(tableName, method, CONF, families);
2363    boolean caughtExcep = false;
2364    try {
2365      // no TS specified == use latest. should not error
2366      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2367      // TS out of range. should error
2368      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2369          System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2370      fail("Expected IOE for TS out of configured timerange");
2371    } catch (FailedSanityCheckException ioe) {
2372      LOG.debug("Received expected exception", ioe);
2373      caughtExcep = true;
2374    }
2375    assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2376  }
2377
2378  @Test
2379  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2380    byte[] fam1 = Bytes.toBytes("columnA");
2381    byte[] fam2 = Bytes.toBytes("columnB");
2382    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2383    byte[] rowA = Bytes.toBytes("rowA");
2384    byte[] rowB = Bytes.toBytes("rowB");
2385
2386    byte[] value = Bytes.toBytes("value");
2387
2388    Delete delete = new Delete(rowA);
2389    delete.addFamily(fam1);
2390
2391    region.delete(delete);
2392
2393    // now create data.
2394    Put put = new Put(rowA);
2395    put.addColumn(fam2, null, value);
2396    region.put(put);
2397
2398    put = new Put(rowB);
2399    put.addColumn(fam1, null, value);
2400    put.addColumn(fam2, null, value);
2401    region.put(put);
2402
2403    Scan scan = new Scan();
2404    scan.addFamily(fam1).addFamily(fam2);
2405    InternalScanner s = region.getScanner(scan);
2406    List<Cell> results = new ArrayList<>();
2407    s.next(results);
2408    assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2409
2410    results.clear();
2411    s.next(results);
2412    assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2413  }
2414
2415  @Test
2416  public void testDataInMemoryWithoutWAL() throws IOException {
2417    FileSystem fs = FileSystem.get(CONF);
2418    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2419    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2420    hLog.init();
2421    // This chunk creation is done throughout the code base. Do we want to move it into core?
2422    // It is missing from this test. W/o it we NPE.
2423    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2424        COLUMN_FAMILY_BYTES);
2425
2426    Cell originalCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
2427      .setRow(row)
2428      .setFamily(COLUMN_FAMILY_BYTES)
2429      .setQualifier(qual1)
2430      .setTimestamp(System.currentTimeMillis())
2431      .setType(KeyValue.Type.Put.getCode())
2432      .setValue(value1)
2433      .build();
2434    final long originalSize = originalCell.getSerializedSize();
2435
2436    Cell addCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
2437      .setRow(row)
2438      .setFamily(COLUMN_FAMILY_BYTES)
2439      .setQualifier(qual1)
2440      .setTimestamp(System.currentTimeMillis())
2441      .setType(KeyValue.Type.Put.getCode())
2442      .setValue(Bytes.toBytes("xxxxxxxxxx"))
2443      .build();
2444    final long addSize = addCell.getSerializedSize();
2445
2446    LOG.info("originalSize:" + originalSize
2447      + ", addSize:" + addSize);
2448    // start test. We expect that the addPut's durability will be replaced
2449    // by originalPut's durability.
2450
2451    // case 1:
2452    testDataInMemoryWithoutWAL(region,
2453            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2454            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2455            originalSize + addSize);
2456
2457    // case 2:
2458    testDataInMemoryWithoutWAL(region,
2459            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2460            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2461            originalSize + addSize);
2462
2463    // case 3:
2464    testDataInMemoryWithoutWAL(region,
2465            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2466            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2467            0);
2468
2469    // case 4:
2470    testDataInMemoryWithoutWAL(region,
2471            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2472            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2473            0);
2474  }
2475
2476  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2477          final Put addPut, long delta) throws IOException {
2478    final long initSize = region.getDataInMemoryWithoutWAL();
2479    // save normalCPHost and replaced by mockedCPHost
2480    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2481    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2482    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2483    // do below format (from Mockito doc).
2484    Mockito.doAnswer(new Answer<Void>() {
2485      @Override
2486      public Void answer(InvocationOnMock invocation) throws Throwable {
2487        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2488        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2489        return null;
2490      }
2491    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2492    ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
2493        newBuilder(COLUMN_FAMILY_BYTES);
2494    ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
2495        Long.MAX_VALUE, region.getCellComparator());
2496    Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
2497        Mockito.any())).thenReturn(info);
2498    Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
2499        Mockito.any())).thenAnswer(i -> i.getArgument(1));
2500    region.setCoprocessorHost(mockedCPHost);
2501
2502    region.put(originalPut);
2503    region.setCoprocessorHost(normalCPHost);
2504    final long finalSize = region.getDataInMemoryWithoutWAL();
2505    assertEquals("finalSize:" + finalSize + ", initSize:"
2506      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2507  }
2508
2509  @Test
2510  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2511    Delete delete = new Delete(row);
2512    delete.addColumns(fam1, qual1);
2513    doTestDelete_AndPostInsert(delete);
2514  }
2515
2516  @Test
2517  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2518    Delete delete = new Delete(row);
2519    delete.addFamily(fam1);
2520    doTestDelete_AndPostInsert(delete);
2521  }
2522
2523  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2524    this.region = initHRegion(tableName, method, CONF, fam1);
2525    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2526    Put put = new Put(row);
2527    put.addColumn(fam1, qual1, value1);
2528    region.put(put);
2529
2530    // now delete the value:
2531    region.delete(delete);
2532
2533    // ok put data:
2534    put = new Put(row);
2535    put.addColumn(fam1, qual1, value2);
2536    region.put(put);
2537
2538    // ok get:
2539    Get get = new Get(row);
2540    get.addColumn(fam1, qual1);
2541
2542    Result r = region.get(get);
2543    assertEquals(1, r.size());
2544    assertArrayEquals(value2, r.getValue(fam1, qual1));
2545
2546    // next:
2547    Scan scan = new Scan(row);
2548    scan.addColumn(fam1, qual1);
2549    InternalScanner s = region.getScanner(scan);
2550
2551    List<Cell> results = new ArrayList<>();
2552    assertEquals(false, s.next(results));
2553    assertEquals(1, results.size());
2554    Cell kv = results.get(0);
2555
2556    assertArrayEquals(value2, CellUtil.cloneValue(kv));
2557    assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2558    assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2559    assertArrayEquals(row, CellUtil.cloneRow(kv));
2560  }
2561
2562  @Test
2563  public void testDelete_CheckTimestampUpdated() throws IOException {
2564    byte[] row1 = Bytes.toBytes("row1");
2565    byte[] col1 = Bytes.toBytes("col1");
2566    byte[] col2 = Bytes.toBytes("col2");
2567    byte[] col3 = Bytes.toBytes("col3");
2568
2569    // Setting up region
2570    this.region = initHRegion(tableName, method, CONF, fam1);
2571    // Building checkerList
2572    List<Cell> kvs = new ArrayList<>();
2573    kvs.add(new KeyValue(row1, fam1, col1, null));
2574    kvs.add(new KeyValue(row1, fam1, col2, null));
2575    kvs.add(new KeyValue(row1, fam1, col3, null));
2576
2577    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2578    deleteMap.put(fam1, kvs);
2579    region.delete(deleteMap, Durability.SYNC_WAL);
2580
2581    // extract the key values out the memstore:
2582    // This is kinda hacky, but better than nothing...
2583    long now = System.currentTimeMillis();
2584    AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2585    Cell firstCell = memstore.getActive().first();
2586    assertTrue(firstCell.getTimestamp() <= now);
2587    now = firstCell.getTimestamp();
2588    for (Cell cell : memstore.getActive().getCellSet()) {
2589      assertTrue(cell.getTimestamp() <= now);
2590      now = cell.getTimestamp();
2591    }
2592  }
2593
2594  // ////////////////////////////////////////////////////////////////////////////
2595  // Get tests
2596  // ////////////////////////////////////////////////////////////////////////////
2597  @Test
2598  public void testGet_FamilyChecker() throws IOException {
2599    byte[] row1 = Bytes.toBytes("row1");
2600    byte[] fam1 = Bytes.toBytes("fam1");
2601    byte[] fam2 = Bytes.toBytes("False");
2602    byte[] col1 = Bytes.toBytes("col1");
2603
2604    // Setting up region
2605    this.region = initHRegion(tableName, method, CONF, fam1);
2606    Get get = new Get(row1);
2607    get.addColumn(fam2, col1);
2608
2609    // Test
2610    try {
2611      region.get(get);
2612      fail("Expecting DoNotRetryIOException in get but did not get any");
2613    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2614      LOG.info("Got expected DoNotRetryIOException successfully");
2615    }
2616  }
2617
2618  @Test
2619  public void testGet_Basic() throws IOException {
2620    byte[] row1 = Bytes.toBytes("row1");
2621    byte[] fam1 = Bytes.toBytes("fam1");
2622    byte[] col1 = Bytes.toBytes("col1");
2623    byte[] col2 = Bytes.toBytes("col2");
2624    byte[] col3 = Bytes.toBytes("col3");
2625    byte[] col4 = Bytes.toBytes("col4");
2626    byte[] col5 = Bytes.toBytes("col5");
2627
2628    // Setting up region
2629    this.region = initHRegion(tableName, method, CONF, fam1);
2630    // Add to memstore
2631    Put put = new Put(row1);
2632    put.addColumn(fam1, col1, null);
2633    put.addColumn(fam1, col2, null);
2634    put.addColumn(fam1, col3, null);
2635    put.addColumn(fam1, col4, null);
2636    put.addColumn(fam1, col5, null);
2637    region.put(put);
2638
2639    Get get = new Get(row1);
2640    get.addColumn(fam1, col2);
2641    get.addColumn(fam1, col4);
2642    // Expected result
2643    KeyValue kv1 = new KeyValue(row1, fam1, col2);
2644    KeyValue kv2 = new KeyValue(row1, fam1, col4);
2645    KeyValue[] expected = { kv1, kv2 };
2646
2647    // Test
2648    Result res = region.get(get);
2649    assertEquals(expected.length, res.size());
2650    for (int i = 0; i < res.size(); i++) {
2651      assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2652      assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2653      assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2654    }
2655
2656    // Test using a filter on a Get
2657    Get g = new Get(row1);
2658    final int count = 2;
2659    g.setFilter(new ColumnCountGetFilter(count));
2660    res = region.get(g);
2661    assertEquals(count, res.size());
2662  }
2663
2664  @Test
2665  public void testGet_Empty() throws IOException {
2666    byte[] row = Bytes.toBytes("row");
2667    byte[] fam = Bytes.toBytes("fam");
2668
2669    this.region = initHRegion(tableName, method, CONF, fam);
2670    Get get = new Get(row);
2671    get.addFamily(fam);
2672    Result r = region.get(get);
2673
2674    assertTrue(r.isEmpty());
2675  }
2676
2677  @Test
2678  public void testGetWithFilter() throws IOException, InterruptedException {
2679    byte[] row1 = Bytes.toBytes("row1");
2680    byte[] fam1 = Bytes.toBytes("fam1");
2681    byte[] col1 = Bytes.toBytes("col1");
2682    byte[] value1 = Bytes.toBytes("value1");
2683    byte[] value2 = Bytes.toBytes("value2");
2684
2685    final int maxVersions = 3;
2686    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2687    hcd.setMaxVersions(maxVersions);
2688    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2689    htd.addFamily(hcd);
2690    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2691    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2692    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2693    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2694    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2695
2696    // Put 4 version to memstore
2697    long ts = 0;
2698    Put put = new Put(row1, ts);
2699    put.addColumn(fam1, col1, value1);
2700    region.put(put);
2701    put = new Put(row1, ts + 1);
2702    put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2703    region.put(put);
2704    put = new Put(row1, ts + 2);
2705    put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2706    region.put(put);
2707    put = new Put(row1, ts + 3);
2708    put.addColumn(fam1, col1, value2);
2709    region.put(put);
2710
2711    Get get = new Get(row1);
2712    get.readAllVersions();
2713    Result res = region.get(get);
2714    // Get 3 versions, the oldest version has gone from user view
2715    assertEquals(maxVersions, res.size());
2716
2717    get.setFilter(new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("value")));
2718    res = region.get(get);
2719    // When use value filter, the oldest version should still gone from user view and it
2720    // should only return one key vaule
2721    assertEquals(1, res.size());
2722    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2723    assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2724
2725    region.flush(true);
2726    region.compact(true);
2727    Thread.sleep(1000);
2728    res = region.get(get);
2729    // After flush and compact, the result should be consistent with previous result
2730    assertEquals(1, res.size());
2731    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2732  }
2733
2734  // ////////////////////////////////////////////////////////////////////////////
2735  // Scanner tests
2736  // ////////////////////////////////////////////////////////////////////////////
2737  @Test
2738  public void testGetScanner_WithOkFamilies() throws IOException {
2739    byte[] fam1 = Bytes.toBytes("fam1");
2740    byte[] fam2 = Bytes.toBytes("fam2");
2741
2742    byte[][] families = { fam1, fam2 };
2743
2744    // Setting up region
2745    this.region = initHRegion(tableName, method, CONF, families);
2746    Scan scan = new Scan();
2747    scan.addFamily(fam1);
2748    scan.addFamily(fam2);
2749    try {
2750      region.getScanner(scan);
2751    } catch (Exception e) {
2752      assertTrue("Families could not be found in Region", false);
2753    }
2754  }
2755
2756  @Test
2757  public void testGetScanner_WithNotOkFamilies() throws IOException {
2758    byte[] fam1 = Bytes.toBytes("fam1");
2759    byte[] fam2 = Bytes.toBytes("fam2");
2760
2761    byte[][] families = { fam1 };
2762
2763    // Setting up region
2764    this.region = initHRegion(tableName, method, CONF, families);
2765    Scan scan = new Scan();
2766    scan.addFamily(fam2);
2767    boolean ok = false;
2768    try {
2769      region.getScanner(scan);
2770    } catch (Exception e) {
2771      ok = true;
2772    }
2773    assertTrue("Families could not be found in Region", ok);
2774  }
2775
2776  @Test
2777  public void testGetScanner_WithNoFamilies() throws IOException {
2778    byte[] row1 = Bytes.toBytes("row1");
2779    byte[] fam1 = Bytes.toBytes("fam1");
2780    byte[] fam2 = Bytes.toBytes("fam2");
2781    byte[] fam3 = Bytes.toBytes("fam3");
2782    byte[] fam4 = Bytes.toBytes("fam4");
2783
2784    byte[][] families = { fam1, fam2, fam3, fam4 };
2785
2786    // Setting up region
2787    this.region = initHRegion(tableName, method, CONF, families);
2788    // Putting data in Region
2789    Put put = new Put(row1);
2790    put.addColumn(fam1, null, null);
2791    put.addColumn(fam2, null, null);
2792    put.addColumn(fam3, null, null);
2793    put.addColumn(fam4, null, null);
2794    region.put(put);
2795
2796    Scan scan = null;
2797    HRegion.RegionScannerImpl is = null;
2798
2799    // Testing to see how many scanners that is produced by getScanner,
2800    // starting
2801    // with known number, 2 - current = 1
2802    scan = new Scan();
2803    scan.addFamily(fam2);
2804    scan.addFamily(fam4);
2805    is = region.getScanner(scan);
2806    assertEquals(1, is.storeHeap.getHeap().size());
2807
2808    scan = new Scan();
2809    is = region.getScanner(scan);
2810    assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2811  }
2812
2813  /**
2814   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2815   *
2816   * @throws IOException
2817   */
2818  @Test
2819  public void testGetScanner_WithRegionClosed() throws IOException {
2820    byte[] fam1 = Bytes.toBytes("fam1");
2821    byte[] fam2 = Bytes.toBytes("fam2");
2822
2823    byte[][] families = { fam1, fam2 };
2824
2825    // Setting up region
2826    try {
2827      this.region = initHRegion(tableName, method, CONF, families);
2828    } catch (IOException e) {
2829      e.printStackTrace();
2830      fail("Got IOException during initHRegion, " + e.getMessage());
2831    }
2832    region.closed.set(true);
2833    try {
2834      region.getScanner(null);
2835      fail("Expected to get an exception during getScanner on a region that is closed");
2836    } catch (NotServingRegionException e) {
2837      // this is the correct exception that is expected
2838    } catch (IOException e) {
2839      fail("Got wrong type of exception - should be a NotServingRegionException, " +
2840          "but was an IOException: "
2841          + e.getMessage());
2842    }
2843  }
2844
2845  @Test
2846  public void testRegionScanner_Next() throws IOException {
2847    byte[] row1 = Bytes.toBytes("row1");
2848    byte[] row2 = Bytes.toBytes("row2");
2849    byte[] fam1 = Bytes.toBytes("fam1");
2850    byte[] fam2 = Bytes.toBytes("fam2");
2851    byte[] fam3 = Bytes.toBytes("fam3");
2852    byte[] fam4 = Bytes.toBytes("fam4");
2853
2854    byte[][] families = { fam1, fam2, fam3, fam4 };
2855    long ts = System.currentTimeMillis();
2856
2857    // Setting up region
2858    this.region = initHRegion(tableName, method, CONF, families);
2859    // Putting data in Region
2860    Put put = null;
2861    put = new Put(row1);
2862    put.addColumn(fam1, (byte[]) null, ts, null);
2863    put.addColumn(fam2, (byte[]) null, ts, null);
2864    put.addColumn(fam3, (byte[]) null, ts, null);
2865    put.addColumn(fam4, (byte[]) null, ts, null);
2866    region.put(put);
2867
2868    put = new Put(row2);
2869    put.addColumn(fam1, (byte[]) null, ts, null);
2870    put.addColumn(fam2, (byte[]) null, ts, null);
2871    put.addColumn(fam3, (byte[]) null, ts, null);
2872    put.addColumn(fam4, (byte[]) null, ts, null);
2873    region.put(put);
2874
2875    Scan scan = new Scan();
2876    scan.addFamily(fam2);
2877    scan.addFamily(fam4);
2878    InternalScanner is = region.getScanner(scan);
2879
2880    List<Cell> res = null;
2881
2882    // Result 1
2883    List<Cell> expected1 = new ArrayList<>();
2884    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2885    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2886
2887    res = new ArrayList<>();
2888    is.next(res);
2889    for (int i = 0; i < res.size(); i++) {
2890      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2891    }
2892
2893    // Result 2
2894    List<Cell> expected2 = new ArrayList<>();
2895    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2896    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2897
2898    res = new ArrayList<>();
2899    is.next(res);
2900    for (int i = 0; i < res.size(); i++) {
2901      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2902    }
2903  }
2904
2905  @Test
2906  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2907    byte[] row1 = Bytes.toBytes("row1");
2908    byte[] qf1 = Bytes.toBytes("qualifier1");
2909    byte[] qf2 = Bytes.toBytes("qualifier2");
2910    byte[] fam1 = Bytes.toBytes("fam1");
2911    byte[][] families = { fam1 };
2912
2913    long ts1 = System.currentTimeMillis();
2914    long ts2 = ts1 + 1;
2915    long ts3 = ts1 + 2;
2916
2917    // Setting up region
2918    this.region = initHRegion(tableName, method, CONF, families);
2919    // Putting data in Region
2920    Put put = null;
2921    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2922    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2923    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2924
2925    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2926    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2927    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2928
2929    put = new Put(row1);
2930    put.add(kv13);
2931    put.add(kv12);
2932    put.add(kv11);
2933    put.add(kv23);
2934    put.add(kv22);
2935    put.add(kv21);
2936    region.put(put);
2937
2938    // Expected
2939    List<Cell> expected = new ArrayList<>();
2940    expected.add(kv13);
2941    expected.add(kv12);
2942
2943    Scan scan = new Scan(row1);
2944    scan.addColumn(fam1, qf1);
2945    scan.setMaxVersions(MAX_VERSIONS);
2946    List<Cell> actual = new ArrayList<>();
2947    InternalScanner scanner = region.getScanner(scan);
2948
2949    boolean hasNext = scanner.next(actual);
2950    assertEquals(false, hasNext);
2951
2952    // Verify result
2953    for (int i = 0; i < expected.size(); i++) {
2954      assertEquals(expected.get(i), actual.get(i));
2955    }
2956  }
2957
2958  @Test
2959  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2960    byte[] row1 = Bytes.toBytes("row1");
2961    byte[] qf1 = Bytes.toBytes("qualifier1");
2962    byte[] qf2 = Bytes.toBytes("qualifier2");
2963    byte[] fam1 = Bytes.toBytes("fam1");
2964    byte[][] families = { fam1 };
2965
2966    long ts1 = 1; // System.currentTimeMillis();
2967    long ts2 = ts1 + 1;
2968    long ts3 = ts1 + 2;
2969
2970    // Setting up region
2971    this.region = initHRegion(tableName, method, CONF, families);
2972    // Putting data in Region
2973    Put put = null;
2974    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2975    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2976    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2977
2978    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2979    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2980    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2981
2982    put = new Put(row1);
2983    put.add(kv13);
2984    put.add(kv12);
2985    put.add(kv11);
2986    put.add(kv23);
2987    put.add(kv22);
2988    put.add(kv21);
2989    region.put(put);
2990    region.flush(true);
2991
2992    // Expected
2993    List<Cell> expected = new ArrayList<>();
2994    expected.add(kv13);
2995    expected.add(kv12);
2996    expected.add(kv23);
2997    expected.add(kv22);
2998
2999    Scan scan = new Scan(row1);
3000    scan.addColumn(fam1, qf1);
3001    scan.addColumn(fam1, qf2);
3002    scan.setMaxVersions(MAX_VERSIONS);
3003    List<Cell> actual = new ArrayList<>();
3004    InternalScanner scanner = region.getScanner(scan);
3005
3006    boolean hasNext = scanner.next(actual);
3007    assertEquals(false, hasNext);
3008
3009    // Verify result
3010    for (int i = 0; i < expected.size(); i++) {
3011      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3012    }
3013  }
3014
3015  @Test
3016  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3017      IOException {
3018    byte[] row1 = Bytes.toBytes("row1");
3019    byte[] fam1 = Bytes.toBytes("fam1");
3020    byte[][] families = { fam1 };
3021    byte[] qf1 = Bytes.toBytes("qualifier1");
3022    byte[] qf2 = Bytes.toBytes("qualifier2");
3023
3024    long ts1 = 1;
3025    long ts2 = ts1 + 1;
3026    long ts3 = ts1 + 2;
3027    long ts4 = ts1 + 3;
3028
3029    // Setting up region
3030    this.region = initHRegion(tableName, method, CONF, families);
3031    // Putting data in Region
3032    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3033    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3034    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3035    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3036
3037    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3038    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3039    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3040    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3041
3042    Put put = null;
3043    put = new Put(row1);
3044    put.add(kv14);
3045    put.add(kv24);
3046    region.put(put);
3047    region.flush(true);
3048
3049    put = new Put(row1);
3050    put.add(kv23);
3051    put.add(kv13);
3052    region.put(put);
3053    region.flush(true);
3054
3055    put = new Put(row1);
3056    put.add(kv22);
3057    put.add(kv12);
3058    region.put(put);
3059    region.flush(true);
3060
3061    put = new Put(row1);
3062    put.add(kv21);
3063    put.add(kv11);
3064    region.put(put);
3065
3066    // Expected
3067    List<Cell> expected = new ArrayList<>();
3068    expected.add(kv14);
3069    expected.add(kv13);
3070    expected.add(kv12);
3071    expected.add(kv24);
3072    expected.add(kv23);
3073    expected.add(kv22);
3074
3075    Scan scan = new Scan(row1);
3076    scan.addColumn(fam1, qf1);
3077    scan.addColumn(fam1, qf2);
3078    int versions = 3;
3079    scan.setMaxVersions(versions);
3080    List<Cell> actual = new ArrayList<>();
3081    InternalScanner scanner = region.getScanner(scan);
3082
3083    boolean hasNext = scanner.next(actual);
3084    assertEquals(false, hasNext);
3085
3086    // Verify result
3087    for (int i = 0; i < expected.size(); i++) {
3088      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3089    }
3090  }
3091
3092  @Test
3093  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3094    byte[] row1 = Bytes.toBytes("row1");
3095    byte[] qf1 = Bytes.toBytes("qualifier1");
3096    byte[] qf2 = Bytes.toBytes("qualifier2");
3097    byte[] fam1 = Bytes.toBytes("fam1");
3098    byte[][] families = { fam1 };
3099
3100    long ts1 = System.currentTimeMillis();
3101    long ts2 = ts1 + 1;
3102    long ts3 = ts1 + 2;
3103
3104    // Setting up region
3105    this.region = initHRegion(tableName, method, CONF, families);
3106    // Putting data in Region
3107    Put put = null;
3108    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3109    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3110    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3111
3112    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3113    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3114    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3115
3116    put = new Put(row1);
3117    put.add(kv13);
3118    put.add(kv12);
3119    put.add(kv11);
3120    put.add(kv23);
3121    put.add(kv22);
3122    put.add(kv21);
3123    region.put(put);
3124
3125    // Expected
3126    List<Cell> expected = new ArrayList<>();
3127    expected.add(kv13);
3128    expected.add(kv12);
3129    expected.add(kv23);
3130    expected.add(kv22);
3131
3132    Scan scan = new Scan(row1);
3133    scan.addFamily(fam1);
3134    scan.setMaxVersions(MAX_VERSIONS);
3135    List<Cell> actual = new ArrayList<>();
3136    InternalScanner scanner = region.getScanner(scan);
3137
3138    boolean hasNext = scanner.next(actual);
3139    assertEquals(false, hasNext);
3140
3141    // Verify result
3142    for (int i = 0; i < expected.size(); i++) {
3143      assertEquals(expected.get(i), actual.get(i));
3144    }
3145  }
3146
3147  @Test
3148  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3149    byte[] row1 = Bytes.toBytes("row1");
3150    byte[] qf1 = Bytes.toBytes("qualifier1");
3151    byte[] qf2 = Bytes.toBytes("qualifier2");
3152    byte[] fam1 = Bytes.toBytes("fam1");
3153
3154    long ts1 = 1; // System.currentTimeMillis();
3155    long ts2 = ts1 + 1;
3156    long ts3 = ts1 + 2;
3157
3158    // Setting up region
3159    this.region = initHRegion(tableName, method, CONF, fam1);
3160    // Putting data in Region
3161    Put put = null;
3162    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3163    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3164    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3165
3166    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3167    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3168    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3169
3170    put = new Put(row1);
3171    put.add(kv13);
3172    put.add(kv12);
3173    put.add(kv11);
3174    put.add(kv23);
3175    put.add(kv22);
3176    put.add(kv21);
3177    region.put(put);
3178    region.flush(true);
3179
3180    // Expected
3181    List<Cell> expected = new ArrayList<>();
3182    expected.add(kv13);
3183    expected.add(kv12);
3184    expected.add(kv23);
3185    expected.add(kv22);
3186
3187    Scan scan = new Scan(row1);
3188    scan.addFamily(fam1);
3189    scan.setMaxVersions(MAX_VERSIONS);
3190    List<Cell> actual = new ArrayList<>();
3191    InternalScanner scanner = region.getScanner(scan);
3192
3193    boolean hasNext = scanner.next(actual);
3194    assertEquals(false, hasNext);
3195
3196    // Verify result
3197    for (int i = 0; i < expected.size(); i++) {
3198      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3199    }
3200  }
3201
3202  @Test
3203  public void testScanner_StopRow1542() throws IOException {
3204    byte[] family = Bytes.toBytes("testFamily");
3205    this.region = initHRegion(tableName, method, CONF, family);
3206    byte[] row1 = Bytes.toBytes("row111");
3207    byte[] row2 = Bytes.toBytes("row222");
3208    byte[] row3 = Bytes.toBytes("row333");
3209    byte[] row4 = Bytes.toBytes("row444");
3210    byte[] row5 = Bytes.toBytes("row555");
3211
3212    byte[] col1 = Bytes.toBytes("Pub111");
3213    byte[] col2 = Bytes.toBytes("Pub222");
3214
3215    Put put = new Put(row1);
3216    put.addColumn(family, col1, Bytes.toBytes(10L));
3217    region.put(put);
3218
3219    put = new Put(row2);
3220    put.addColumn(family, col1, Bytes.toBytes(15L));
3221    region.put(put);
3222
3223    put = new Put(row3);
3224    put.addColumn(family, col2, Bytes.toBytes(20L));
3225    region.put(put);
3226
3227    put = new Put(row4);
3228    put.addColumn(family, col2, Bytes.toBytes(30L));
3229    region.put(put);
3230
3231    put = new Put(row5);
3232    put.addColumn(family, col1, Bytes.toBytes(40L));
3233    region.put(put);
3234
3235    Scan scan = new Scan(row3, row4);
3236    scan.setMaxVersions();
3237    scan.addColumn(family, col1);
3238    InternalScanner s = region.getScanner(scan);
3239
3240    List<Cell> results = new ArrayList<>();
3241    assertEquals(false, s.next(results));
3242    assertEquals(0, results.size());
3243  }
3244
3245  @Test
3246  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3247    byte[] row1 = Bytes.toBytes("row1");
3248    byte[] fam1 = Bytes.toBytes("fam1");
3249    byte[] qf1 = Bytes.toBytes("qualifier1");
3250    byte[] qf2 = Bytes.toBytes("quateslifier2");
3251
3252    long ts1 = 1;
3253    long ts2 = ts1 + 1;
3254    long ts3 = ts1 + 2;
3255    long ts4 = ts1 + 3;
3256
3257    // Setting up region
3258    this.region = initHRegion(tableName, method, CONF, fam1);
3259    // Putting data in Region
3260    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3261    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3262    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3263    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3264
3265    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3266    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3267    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3268    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3269
3270    Put put = null;
3271    put = new Put(row1);
3272    put.add(kv14);
3273    put.add(kv24);
3274    region.put(put);
3275    region.flush(true);
3276
3277    put = new Put(row1);
3278    put.add(kv23);
3279    put.add(kv13);
3280    region.put(put);
3281    region.flush(true);
3282
3283    put = new Put(row1);
3284    put.add(kv22);
3285    put.add(kv12);
3286    region.put(put);
3287    region.flush(true);
3288
3289    put = new Put(row1);
3290    put.add(kv21);
3291    put.add(kv11);
3292    region.put(put);
3293
3294    // Expected
3295    List<KeyValue> expected = new ArrayList<>();
3296    expected.add(kv14);
3297    expected.add(kv13);
3298    expected.add(kv12);
3299    expected.add(kv24);
3300    expected.add(kv23);
3301    expected.add(kv22);
3302
3303    Scan scan = new Scan(row1);
3304    int versions = 3;
3305    scan.setMaxVersions(versions);
3306    List<Cell> actual = new ArrayList<>();
3307    InternalScanner scanner = region.getScanner(scan);
3308
3309    boolean hasNext = scanner.next(actual);
3310    assertEquals(false, hasNext);
3311
3312    // Verify result
3313    for (int i = 0; i < expected.size(); i++) {
3314      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3315    }
3316  }
3317
3318  /**
3319   * Added for HBASE-5416
3320   *
3321   * Here we test scan optimization when only subset of CFs are used in filter
3322   * conditions.
3323   */
3324  @Test
3325  public void testScanner_JoinedScanners() throws IOException {
3326    byte[] cf_essential = Bytes.toBytes("essential");
3327    byte[] cf_joined = Bytes.toBytes("joined");
3328    byte[] cf_alpha = Bytes.toBytes("alpha");
3329    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3330    byte[] row1 = Bytes.toBytes("row1");
3331    byte[] row2 = Bytes.toBytes("row2");
3332    byte[] row3 = Bytes.toBytes("row3");
3333
3334    byte[] col_normal = Bytes.toBytes("d");
3335    byte[] col_alpha = Bytes.toBytes("a");
3336
3337    byte[] filtered_val = Bytes.toBytes(3);
3338
3339    Put put = new Put(row1);
3340    put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3341    put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3342    region.put(put);
3343
3344    put = new Put(row2);
3345    put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3346    put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3347    put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3348    region.put(put);
3349
3350    put = new Put(row3);
3351    put.addColumn(cf_essential, col_normal, filtered_val);
3352    put.addColumn(cf_joined, col_normal, filtered_val);
3353    region.put(put);
3354
3355    // Check two things:
3356    // 1. result list contains expected values
3357    // 2. result list is sorted properly
3358
3359    Scan scan = new Scan();
3360    Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3361            CompareOperator.NOT_EQUAL, filtered_val);
3362    scan.setFilter(filter);
3363    scan.setLoadColumnFamiliesOnDemand(true);
3364    InternalScanner s = region.getScanner(scan);
3365
3366    List<Cell> results = new ArrayList<>();
3367    assertTrue(s.next(results));
3368    assertEquals(1, results.size());
3369    results.clear();
3370
3371    assertTrue(s.next(results));
3372    assertEquals(3, results.size());
3373    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3374    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3375    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3376    results.clear();
3377
3378    assertFalse(s.next(results));
3379    assertEquals(0, results.size());
3380  }
3381
3382  /**
3383   * HBASE-5416
3384   *
3385   * Test case when scan limits amount of KVs returned on each next() call.
3386   */
3387  @Test
3388  public void testScanner_JoinedScannersWithLimits() throws IOException {
3389    final byte[] cf_first = Bytes.toBytes("first");
3390    final byte[] cf_second = Bytes.toBytes("second");
3391
3392    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3393    final byte[] col_a = Bytes.toBytes("a");
3394    final byte[] col_b = Bytes.toBytes("b");
3395
3396    Put put;
3397
3398    for (int i = 0; i < 10; i++) {
3399      put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3400      put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3401      if (i < 5) {
3402        put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3403        put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3404        put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3405      }
3406      region.put(put);
3407    }
3408
3409    Scan scan = new Scan();
3410    scan.setLoadColumnFamiliesOnDemand(true);
3411    Filter bogusFilter = new FilterBase() {
3412      @Override
3413      public ReturnCode filterCell(final Cell ignored) throws IOException {
3414        return ReturnCode.INCLUDE;
3415      }
3416      @Override
3417      public boolean isFamilyEssential(byte[] name) {
3418        return Bytes.equals(name, cf_first);
3419      }
3420    };
3421
3422    scan.setFilter(bogusFilter);
3423    InternalScanner s = region.getScanner(scan);
3424
3425    // Our data looks like this:
3426    // r0: first:a, first:b, second:a, second:b
3427    // r1: first:a, first:b, second:a, second:b
3428    // r2: first:a, first:b, second:a, second:b
3429    // r3: first:a, first:b, second:a, second:b
3430    // r4: first:a, first:b, second:a, second:b
3431    // r5: first:a
3432    // r6: first:a
3433    // r7: first:a
3434    // r8: first:a
3435    // r9: first:a
3436
3437    // But due to next's limit set to 3, we should get this:
3438    // r0: first:a, first:b, second:a
3439    // r0: second:b
3440    // r1: first:a, first:b, second:a
3441    // r1: second:b
3442    // r2: first:a, first:b, second:a
3443    // r2: second:b
3444    // r3: first:a, first:b, second:a
3445    // r3: second:b
3446    // r4: first:a, first:b, second:a
3447    // r4: second:b
3448    // r5: first:a
3449    // r6: first:a
3450    // r7: first:a
3451    // r8: first:a
3452    // r9: first:a
3453
3454    List<Cell> results = new ArrayList<>();
3455    int index = 0;
3456    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3457    while (true) {
3458      boolean more = s.next(results, scannerContext);
3459      if ((index >> 1) < 5) {
3460        if (index % 2 == 0) {
3461          assertEquals(3, results.size());
3462        } else {
3463          assertEquals(1, results.size());
3464        }
3465      } else {
3466        assertEquals(1, results.size());
3467      }
3468      results.clear();
3469      index++;
3470      if (!more) {
3471        break;
3472      }
3473    }
3474  }
3475
3476  /**
3477   * Write an HFile block full with Cells whose qualifier that are identical between
3478   * 0 and Short.MAX_VALUE. See HBASE-13329.
3479   * @throws Exception
3480   */
3481  @Test
3482  public void testLongQualifier() throws Exception {
3483    byte[] family = Bytes.toBytes("family");
3484    this.region = initHRegion(tableName, method, CONF, family);
3485    byte[] q = new byte[Short.MAX_VALUE+2];
3486    Arrays.fill(q, 0, q.length-1, (byte)42);
3487    for (byte i=0; i<10; i++) {
3488      Put p = new Put(Bytes.toBytes("row"));
3489      // qualifiers that differ past Short.MAX_VALUE
3490      q[q.length-1]=i;
3491      p.addColumn(family, q, q);
3492      region.put(p);
3493    }
3494    region.flush(false);
3495  }
3496
3497  /**
3498   * Flushes the cache in a thread while scanning. The tests verify that the
3499   * scan is coherent - e.g. the returned results are always of the same or
3500   * later update as the previous results.
3501   *
3502   * @throws IOException
3503   *           scan / compact
3504   * @throws InterruptedException
3505   *           thread join
3506   */
3507  @Test
3508  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3509    byte[] family = Bytes.toBytes("family");
3510    int numRows = 1000;
3511    int flushAndScanInterval = 10;
3512    int compactInterval = 10 * flushAndScanInterval;
3513
3514    this.region = initHRegion(tableName, method, CONF, family);
3515    FlushThread flushThread = new FlushThread();
3516    try {
3517      flushThread.start();
3518
3519      Scan scan = new Scan();
3520      scan.addFamily(family);
3521      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOperator.EQUAL,
3522          new BinaryComparator(Bytes.toBytes(5L))));
3523
3524      int expectedCount = 0;
3525      List<Cell> res = new ArrayList<>();
3526
3527      boolean toggle = true;
3528      for (long i = 0; i < numRows; i++) {
3529        Put put = new Put(Bytes.toBytes(i));
3530        put.setDurability(Durability.SKIP_WAL);
3531        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3532        region.put(put);
3533
3534        if (i != 0 && i % compactInterval == 0) {
3535          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3536          region.compact(true);
3537        }
3538
3539        if (i % 10 == 5L) {
3540          expectedCount++;
3541        }
3542
3543        if (i != 0 && i % flushAndScanInterval == 0) {
3544          res.clear();
3545          InternalScanner scanner = region.getScanner(scan);
3546          if (toggle) {
3547            flushThread.flush();
3548          }
3549          while (scanner.next(res))
3550            ;
3551          if (!toggle) {
3552            flushThread.flush();
3553          }
3554          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3555              expectedCount, res.size());
3556          toggle = !toggle;
3557        }
3558      }
3559
3560    } finally {
3561      try {
3562        flushThread.done();
3563        flushThread.join();
3564        flushThread.checkNoError();
3565      } catch (InterruptedException ie) {
3566        LOG.warn("Caught exception when joining with flushThread", ie);
3567      }
3568      HBaseTestingUtility.closeRegionAndWAL(this.region);
3569      this.region = null;
3570    }
3571  }
3572
3573  protected class FlushThread extends Thread {
3574    private volatile boolean done;
3575    private Throwable error = null;
3576
3577    FlushThread() {
3578      super("FlushThread");
3579    }
3580
3581    public void done() {
3582      done = true;
3583      synchronized (this) {
3584        interrupt();
3585      }
3586    }
3587
3588    public void checkNoError() {
3589      if (error != null) {
3590        assertNull(error);
3591      }
3592    }
3593
3594    @Override
3595    public void run() {
3596      done = false;
3597      while (!done) {
3598        synchronized (this) {
3599          try {
3600            wait();
3601          } catch (InterruptedException ignored) {
3602            if (done) {
3603              break;
3604            }
3605          }
3606        }
3607        try {
3608          region.flush(true);
3609        } catch (IOException e) {
3610          if (!done) {
3611            LOG.error("Error while flushing cache", e);
3612            error = e;
3613          }
3614          break;
3615        } catch (Throwable t) {
3616          LOG.error("Uncaught exception", t);
3617          throw t;
3618        }
3619      }
3620    }
3621
3622    public void flush() {
3623      synchronized (this) {
3624        notify();
3625      }
3626    }
3627  }
3628
3629  /**
3630   * Writes very wide records and scans for the latest every time.. Flushes and
3631   * compacts the region every now and then to keep things realistic.
3632   *
3633   * @throws IOException
3634   *           by flush / scan / compaction
3635   * @throws InterruptedException
3636   *           when joining threads
3637   */
3638  @Test
3639  public void testWritesWhileScanning() throws IOException, InterruptedException {
3640    int testCount = 100;
3641    int numRows = 1;
3642    int numFamilies = 10;
3643    int numQualifiers = 100;
3644    int flushInterval = 7;
3645    int compactInterval = 5 * flushInterval;
3646    byte[][] families = new byte[numFamilies][];
3647    for (int i = 0; i < numFamilies; i++) {
3648      families[i] = Bytes.toBytes("family" + i);
3649    }
3650    byte[][] qualifiers = new byte[numQualifiers][];
3651    for (int i = 0; i < numQualifiers; i++) {
3652      qualifiers[i] = Bytes.toBytes("qual" + i);
3653    }
3654
3655    this.region = initHRegion(tableName, method, CONF, families);
3656    FlushThread flushThread = new FlushThread();
3657    PutThread putThread = new PutThread(numRows, families, qualifiers);
3658    try {
3659      putThread.start();
3660      putThread.waitForFirstPut();
3661
3662      flushThread.start();
3663
3664      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3665
3666      int expectedCount = numFamilies * numQualifiers;
3667      List<Cell> res = new ArrayList<>();
3668
3669      long prevTimestamp = 0L;
3670      for (int i = 0; i < testCount; i++) {
3671
3672        if (i != 0 && i % compactInterval == 0) {
3673          region.compact(true);
3674          for (HStore store : region.getStores()) {
3675            store.closeAndArchiveCompactedFiles();
3676          }
3677        }
3678
3679        if (i != 0 && i % flushInterval == 0) {
3680          flushThread.flush();
3681        }
3682
3683        boolean previousEmpty = res.isEmpty();
3684        res.clear();
3685        try (InternalScanner scanner = region.getScanner(scan)) {
3686          boolean moreRows;
3687          do {
3688            moreRows = scanner.next(res);
3689          } while (moreRows);
3690        }
3691        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3692          assertEquals("i=" + i, expectedCount, res.size());
3693          long timestamp = res.get(0).getTimestamp();
3694          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3695              timestamp >= prevTimestamp);
3696          prevTimestamp = timestamp;
3697        }
3698      }
3699
3700      putThread.done();
3701
3702      region.flush(true);
3703
3704    } finally {
3705      try {
3706        flushThread.done();
3707        flushThread.join();
3708        flushThread.checkNoError();
3709
3710        putThread.join();
3711        putThread.checkNoError();
3712      } catch (InterruptedException ie) {
3713        LOG.warn("Caught exception when joining with flushThread", ie);
3714      }
3715
3716      try {
3717          HBaseTestingUtility.closeRegionAndWAL(this.region);
3718      } catch (DroppedSnapshotException dse) {
3719        // We could get this on way out because we interrupt the background flusher and it could
3720        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3721        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3722        // flush because the accounting is 'off' since original DSE.
3723      }
3724      this.region = null;
3725    }
3726  }
3727
3728  protected class PutThread extends Thread {
3729    private volatile boolean done;
3730    private volatile int numPutsFinished = 0;
3731
3732    private Throwable error = null;
3733    private int numRows;
3734    private byte[][] families;
3735    private byte[][] qualifiers;
3736
3737    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3738      super("PutThread");
3739      this.numRows = numRows;
3740      this.families = families;
3741      this.qualifiers = qualifiers;
3742    }
3743
3744    /**
3745     * Block calling thread until this instance of PutThread has put at least one row.
3746     */
3747    public void waitForFirstPut() throws InterruptedException {
3748      // wait until put thread actually puts some data
3749      while (isAlive() && numPutsFinished == 0) {
3750        checkNoError();
3751        Thread.sleep(50);
3752      }
3753    }
3754
3755    public void done() {
3756      done = true;
3757      synchronized (this) {
3758        interrupt();
3759      }
3760    }
3761
3762    public void checkNoError() {
3763      if (error != null) {
3764        assertNull(error);
3765      }
3766    }
3767
3768    @Override
3769    public void run() {
3770      done = false;
3771      while (!done) {
3772        try {
3773          for (int r = 0; r < numRows; r++) {
3774            byte[] row = Bytes.toBytes("row" + r);
3775            Put put = new Put(row);
3776            put.setDurability(Durability.SKIP_WAL);
3777            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3778            for (byte[] family : families) {
3779              for (byte[] qualifier : qualifiers) {
3780                put.addColumn(family, qualifier, numPutsFinished, value);
3781              }
3782            }
3783            region.put(put);
3784            numPutsFinished++;
3785            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3786              LOG.debug("put iteration = {}", numPutsFinished);
3787              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3788              region.delete(delete);
3789            }
3790            numPutsFinished++;
3791          }
3792        } catch (InterruptedIOException e) {
3793          // This is fine. It means we are done, or didn't get the lock on time
3794          LOG.info("Interrupted", e);
3795        } catch (IOException e) {
3796          LOG.error("Error while putting records", e);
3797          error = e;
3798          break;
3799        }
3800      }
3801
3802    }
3803
3804  }
3805
3806  /**
3807   * Writes very wide records and gets the latest row every time.. Flushes and
3808   * compacts the region aggressivly to catch issues.
3809   *
3810   * @throws IOException
3811   *           by flush / scan / compaction
3812   * @throws InterruptedException
3813   *           when joining threads
3814   */
3815  @Test
3816  public void testWritesWhileGetting() throws Exception {
3817    int testCount = 50;
3818    int numRows = 1;
3819    int numFamilies = 10;
3820    int numQualifiers = 100;
3821    int compactInterval = 100;
3822    byte[][] families = new byte[numFamilies][];
3823    for (int i = 0; i < numFamilies; i++) {
3824      families[i] = Bytes.toBytes("family" + i);
3825    }
3826    byte[][] qualifiers = new byte[numQualifiers][];
3827    for (int i = 0; i < numQualifiers; i++) {
3828      qualifiers[i] = Bytes.toBytes("qual" + i);
3829    }
3830
3831
3832    // This test flushes constantly and can cause many files to be created,
3833    // possibly
3834    // extending over the ulimit. Make sure compactions are aggressive in
3835    // reducing
3836    // the number of HFiles created.
3837    Configuration conf = HBaseConfiguration.create(CONF);
3838    conf.setInt("hbase.hstore.compaction.min", 1);
3839    conf.setInt("hbase.hstore.compaction.max", 1000);
3840    this.region = initHRegion(tableName, method, conf, families);
3841    PutThread putThread = null;
3842    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3843    try {
3844      putThread = new PutThread(numRows, families, qualifiers);
3845      putThread.start();
3846      putThread.waitForFirstPut();
3847
3848      // Add a thread that flushes as fast as possible
3849      ctx.addThread(new RepeatingTestThread(ctx) {
3850
3851        @Override
3852        public void doAnAction() throws Exception {
3853          region.flush(true);
3854          // Compact regularly to avoid creating too many files and exceeding
3855          // the ulimit.
3856          region.compact(false);
3857          for (HStore store : region.getStores()) {
3858            store.closeAndArchiveCompactedFiles();
3859          }
3860        }
3861      });
3862      ctx.startThreads();
3863
3864      Get get = new Get(Bytes.toBytes("row0"));
3865      Result result = null;
3866
3867      int expectedCount = numFamilies * numQualifiers;
3868
3869      long prevTimestamp = 0L;
3870      for (int i = 0; i < testCount; i++) {
3871        LOG.info("testWritesWhileGetting verify turn " + i);
3872        boolean previousEmpty = result == null || result.isEmpty();
3873        result = region.get(get);
3874        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
3875          assertEquals("i=" + i, expectedCount, result.size());
3876          // TODO this was removed, now what dangit?!
3877          // search looking for the qualifier in question?
3878          long timestamp = 0;
3879          for (Cell kv : result.rawCells()) {
3880            if (CellUtil.matchingFamily(kv, families[0])
3881                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
3882              timestamp = kv.getTimestamp();
3883            }
3884          }
3885          assertTrue(timestamp >= prevTimestamp);
3886          prevTimestamp = timestamp;
3887          Cell previousKV = null;
3888
3889          for (Cell kv : result.rawCells()) {
3890            byte[] thisValue = CellUtil.cloneValue(kv);
3891            if (previousKV != null) {
3892              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
3893                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
3894                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
3895                    + "(memStoreTS:" + kv.getSequenceId() + ")");
3896                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
3897              }
3898            }
3899            previousKV = kv;
3900          }
3901        }
3902      }
3903    } finally {
3904      if (putThread != null)
3905        putThread.done();
3906
3907      region.flush(true);
3908
3909      if (putThread != null) {
3910        putThread.join();
3911        putThread.checkNoError();
3912      }
3913
3914      ctx.stop();
3915      HBaseTestingUtility.closeRegionAndWAL(this.region);
3916      this.region = null;
3917    }
3918  }
3919
3920  @Test
3921  public void testHolesInMeta() throws Exception {
3922    byte[] family = Bytes.toBytes("family");
3923    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
3924        false, family);
3925    byte[] rowNotServed = Bytes.toBytes("a");
3926    Get g = new Get(rowNotServed);
3927    try {
3928      region.get(g);
3929      fail();
3930    } catch (WrongRegionException x) {
3931      // OK
3932    }
3933    byte[] row = Bytes.toBytes("y");
3934    g = new Get(row);
3935    region.get(g);
3936  }
3937
3938  @Test
3939  public void testIndexesScanWithOneDeletedRow() throws IOException {
3940    byte[] family = Bytes.toBytes("family");
3941
3942    // Setting up region
3943    this.region = initHRegion(tableName, method, CONF, family);
3944    Put put = new Put(Bytes.toBytes(1L));
3945    put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
3946    region.put(put);
3947
3948    region.flush(true);
3949
3950    Delete delete = new Delete(Bytes.toBytes(1L), 1L);
3951    region.delete(delete);
3952
3953    put = new Put(Bytes.toBytes(2L));
3954    put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
3955    region.put(put);
3956
3957    Scan idxScan = new Scan();
3958    idxScan.addFamily(family);
3959    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
3960        new SingleColumnValueFilter(family, qual1, CompareOperator.GREATER_OR_EQUAL,
3961            new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
3962                    CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
3963    InternalScanner scanner = region.getScanner(idxScan);
3964    List<Cell> res = new ArrayList<>();
3965
3966    while (scanner.next(res)) {
3967      // Ignore res value.
3968    }
3969    assertEquals(1L, res.size());
3970  }
3971
3972  // ////////////////////////////////////////////////////////////////////////////
3973  // Bloom filter test
3974  // ////////////////////////////////////////////////////////////////////////////
3975  @Test
3976  public void testBloomFilterSize() throws IOException {
3977    byte[] fam1 = Bytes.toBytes("fam1");
3978    byte[] qf1 = Bytes.toBytes("col");
3979    byte[] val1 = Bytes.toBytes("value1");
3980    // Create Table
3981    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
3982        .setBloomFilterType(BloomType.ROWCOL);
3983
3984    HTableDescriptor htd = new HTableDescriptor(tableName);
3985    htd.addFamily(hcd);
3986    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3987    this.region = TEST_UTIL.createLocalHRegion(info, htd);
3988    int num_unique_rows = 10;
3989    int duplicate_multiplier = 2;
3990    int num_storefiles = 4;
3991
3992    int version = 0;
3993    for (int f = 0; f < num_storefiles; f++) {
3994      for (int i = 0; i < duplicate_multiplier; i++) {
3995        for (int j = 0; j < num_unique_rows; j++) {
3996          Put put = new Put(Bytes.toBytes("row" + j));
3997          put.setDurability(Durability.SKIP_WAL);
3998          long ts = version++;
3999          put.addColumn(fam1, qf1, ts, val1);
4000          region.put(put);
4001        }
4002      }
4003      region.flush(true);
4004    }
4005    // before compaction
4006    HStore store = region.getStore(fam1);
4007    Collection<HStoreFile> storeFiles = store.getStorefiles();
4008    for (HStoreFile storefile : storeFiles) {
4009      StoreFileReader reader = storefile.getReader();
4010      reader.loadFileInfo();
4011      reader.loadBloomfilter();
4012      assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4013      assertEquals(num_unique_rows, reader.getFilterEntries());
4014    }
4015
4016    region.compact(true);
4017
4018    // after compaction
4019    storeFiles = store.getStorefiles();
4020    for (HStoreFile storefile : storeFiles) {
4021      StoreFileReader reader = storefile.getReader();
4022      reader.loadFileInfo();
4023      reader.loadBloomfilter();
4024      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4025      assertEquals(num_unique_rows, reader.getFilterEntries());
4026    }
4027  }
4028
4029  @Test
4030  public void testAllColumnsWithBloomFilter() throws IOException {
4031    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4032    byte[] FAMILY = Bytes.toBytes("family");
4033
4034    // Create table
4035    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4036        .setBloomFilterType(BloomType.ROWCOL);
4037    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4038    htd.addFamily(hcd);
4039    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4040    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4041    // For row:0, col:0: insert versions 1 through 5.
4042    byte[] row = Bytes.toBytes("row:" + 0);
4043    byte[] column = Bytes.toBytes("column:" + 0);
4044    Put put = new Put(row);
4045    put.setDurability(Durability.SKIP_WAL);
4046    for (long idx = 1; idx <= 4; idx++) {
4047      put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4048    }
4049    region.put(put);
4050
4051    // Flush
4052    region.flush(true);
4053
4054    // Get rows
4055    Get get = new Get(row);
4056    get.readAllVersions();
4057    Cell[] kvs = region.get(get).rawCells();
4058
4059    // Check if rows are correct
4060    assertEquals(4, kvs.length);
4061    checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4062    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4063    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4064    checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4065  }
4066
4067  /**
4068   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4069   * issuing delete row on columns with bloom filter set to row+col
4070   * (BloomType.ROWCOL)
4071   */
4072  @Test
4073  public void testDeleteRowWithBloomFilter() throws IOException {
4074    byte[] familyName = Bytes.toBytes("familyName");
4075
4076    // Create Table
4077    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4078        .setBloomFilterType(BloomType.ROWCOL);
4079
4080    HTableDescriptor htd = new HTableDescriptor(tableName);
4081    htd.addFamily(hcd);
4082    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4083    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4084    // Insert some data
4085    byte[] row = Bytes.toBytes("row1");
4086    byte[] col = Bytes.toBytes("col1");
4087
4088    Put put = new Put(row);
4089    put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4090    region.put(put);
4091    region.flush(true);
4092
4093    Delete del = new Delete(row);
4094    region.delete(del);
4095    region.flush(true);
4096
4097    // Get remaining rows (should have none)
4098    Get get = new Get(row);
4099    get.addColumn(familyName, col);
4100
4101    Cell[] keyValues = region.get(get).rawCells();
4102    assertEquals(0, keyValues.length);
4103  }
4104
4105  @Test
4106  public void testgetHDFSBlocksDistribution() throws Exception {
4107    HBaseTestingUtility htu = new HBaseTestingUtility();
4108    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4109    // break up the file in to more pieces that can be distributed across the three nodes and we
4110    // won't be able to have the condition this test asserts; that at least one node has
4111    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4112    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4113    // final int DEFAULT_BLOCK_SIZE = 1024;
4114    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4115    htu.getConfiguration().setInt("dfs.replication", 2);
4116
4117    // set up a cluster with 3 nodes
4118    MiniHBaseCluster cluster = null;
4119    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4120    int regionServersCount = 3;
4121
4122    try {
4123      StartMiniClusterOption option = StartMiniClusterOption.builder()
4124          .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build();
4125      cluster = htu.startMiniCluster(option);
4126      byte[][] families = { fam1, fam2 };
4127      Table ht = htu.createTable(tableName, families);
4128
4129      // Setting up region
4130      byte row[] = Bytes.toBytes("row1");
4131      byte col[] = Bytes.toBytes("col1");
4132
4133      Put put = new Put(row);
4134      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4135      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4136      ht.put(put);
4137
4138      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4139      firstRegion.flush(true);
4140      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4141
4142      // Given the default replication factor is 2 and we have 2 HFiles,
4143      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4144      // must be at least one host that have replica for 2 HFiles. That host's
4145      // weight will be equal to the unique block weight.
4146      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4147      StringBuilder sb = new StringBuilder();
4148      for (String host: blocksDistribution1.getTopHosts()) {
4149        if (sb.length() > 0) sb.append(", ");
4150        sb.append(host);
4151        sb.append("=");
4152        sb.append(blocksDistribution1.getWeight(host));
4153      }
4154
4155      String topHost = blocksDistribution1.getTopHosts().get(0);
4156      long topHostWeight = blocksDistribution1.getWeight(topHost);
4157      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4158        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4159      LOG.info(msg);
4160      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4161
4162      // use the static method to compute the value, it should be the same.
4163      // static method is used by load balancer or other components
4164      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4165          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4166      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4167
4168      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4169
4170      ht.close();
4171    } finally {
4172      if (cluster != null) {
4173        htu.shutdownMiniCluster();
4174      }
4175    }
4176  }
4177
4178  /**
4179   * Testcase to check state of region initialization task set to ABORTED or not
4180   * if any exceptions during initialization
4181   *
4182   * @throws Exception
4183   */
4184  @Test
4185  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4186    HRegionInfo info;
4187    try {
4188      FileSystem fs = Mockito.mock(FileSystem.class);
4189      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4190      HTableDescriptor htd = new HTableDescriptor(tableName);
4191      htd.addFamily(new HColumnDescriptor("cf"));
4192      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4193          HConstants.EMPTY_BYTE_ARRAY, false);
4194      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4195      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4196      // region initialization throws IOException and set task state to ABORTED.
4197      region.initialize();
4198      fail("Region initialization should fail due to IOException");
4199    } catch (IOException io) {
4200      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4201      for (MonitoredTask monitoredTask : tasks) {
4202        if (!(monitoredTask instanceof MonitoredRPCHandler)
4203            && monitoredTask.getDescription().contains(region.toString())) {
4204          assertTrue("Region state should be ABORTED.",
4205              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4206          break;
4207        }
4208      }
4209    }
4210  }
4211
4212  /**
4213   * Verifies that the .regioninfo file is written on region creation and that
4214   * is recreated if missing during region opening.
4215   */
4216  @Test
4217  public void testRegionInfoFileCreation() throws IOException {
4218    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4219
4220    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4221    htd.addFamily(new HColumnDescriptor("cf"));
4222
4223    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4224
4225    // Create a region and skip the initialization (like CreateTableHandler)
4226    region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4227    Path regionDir = region.getRegionFileSystem().getRegionDir();
4228    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4229    HBaseTestingUtility.closeRegionAndWAL(region);
4230
4231    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4232
4233    // Verify that the .regioninfo file is present
4234    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4235        fs.exists(regionInfoFile));
4236
4237    // Try to open the region
4238    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4239    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4240    HBaseTestingUtility.closeRegionAndWAL(region);
4241
4242    // Verify that the .regioninfo file is still there
4243    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4244        fs.exists(regionInfoFile));
4245
4246    // Remove the .regioninfo file and verify is recreated on region open
4247    fs.delete(regionInfoFile, true);
4248    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4249        fs.exists(regionInfoFile));
4250
4251    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4252//    region = TEST_UTIL.openHRegion(hri, htd);
4253    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4254    HBaseTestingUtility.closeRegionAndWAL(region);
4255
4256    // Verify that the .regioninfo file is still there
4257    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4258        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4259
4260    region = null;
4261  }
4262
4263  /**
4264   * TestCase for increment
4265   */
4266  private static class Incrementer implements Runnable {
4267    private HRegion region;
4268    private final static byte[] incRow = Bytes.toBytes("incRow");
4269    private final static byte[] family = Bytes.toBytes("family");
4270    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4271    private final static long ONE = 1L;
4272    private int incCounter;
4273
4274    public Incrementer(HRegion region, int incCounter) {
4275      this.region = region;
4276      this.incCounter = incCounter;
4277    }
4278
4279    @Override
4280    public void run() {
4281      int count = 0;
4282      while (count < incCounter) {
4283        Increment inc = new Increment(incRow);
4284        inc.addColumn(family, qualifier, ONE);
4285        count++;
4286        try {
4287          region.increment(inc);
4288        } catch (IOException e) {
4289          LOG.info("Count=" + count + ", " + e);
4290          break;
4291        }
4292      }
4293    }
4294  }
4295
4296  /**
4297   * Test case to check increment function with memstore flushing
4298   * @throws Exception
4299   */
4300  @Test
4301  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4302    byte[] family = Incrementer.family;
4303    this.region = initHRegion(tableName, method, CONF, family);
4304    final HRegion region = this.region;
4305    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4306    Runnable flusher = new Runnable() {
4307      @Override
4308      public void run() {
4309        while (!incrementDone.get()) {
4310          try {
4311            region.flush(true);
4312          } catch (Exception e) {
4313            e.printStackTrace();
4314          }
4315        }
4316      }
4317    };
4318
4319    // after all increment finished, the row will increment to 20*100 = 2000
4320    int threadNum = 20;
4321    int incCounter = 100;
4322    long expected = (long) threadNum * incCounter;
4323    Thread[] incrementers = new Thread[threadNum];
4324    Thread flushThread = new Thread(flusher);
4325    for (int i = 0; i < threadNum; i++) {
4326      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4327      incrementers[i].start();
4328    }
4329    flushThread.start();
4330    for (int i = 0; i < threadNum; i++) {
4331      incrementers[i].join();
4332    }
4333
4334    incrementDone.set(true);
4335    flushThread.join();
4336
4337    Get get = new Get(Incrementer.incRow);
4338    get.addColumn(Incrementer.family, Incrementer.qualifier);
4339    get.readVersions(1);
4340    Result res = this.region.get(get);
4341    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4342
4343    // we just got the latest version
4344    assertEquals(1, kvs.size());
4345    Cell kv = kvs.get(0);
4346    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4347  }
4348
4349  /**
4350   * TestCase for append
4351   */
4352  private static class Appender implements Runnable {
4353    private HRegion region;
4354    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4355    private final static byte[] family = Bytes.toBytes("family");
4356    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4357    private final static byte[] CHAR = Bytes.toBytes("a");
4358    private int appendCounter;
4359
4360    public Appender(HRegion region, int appendCounter) {
4361      this.region = region;
4362      this.appendCounter = appendCounter;
4363    }
4364
4365    @Override
4366    public void run() {
4367      int count = 0;
4368      while (count < appendCounter) {
4369        Append app = new Append(appendRow);
4370        app.addColumn(family, qualifier, CHAR);
4371        count++;
4372        try {
4373          region.append(app);
4374        } catch (IOException e) {
4375          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4376          break;
4377        }
4378      }
4379    }
4380  }
4381
4382  /**
4383   * Test case to check append function with memstore flushing
4384   * @throws Exception
4385   */
4386  @Test
4387  public void testParallelAppendWithMemStoreFlush() throws Exception {
4388    byte[] family = Appender.family;
4389    this.region = initHRegion(tableName, method, CONF, family);
4390    final HRegion region = this.region;
4391    final AtomicBoolean appendDone = new AtomicBoolean(false);
4392    Runnable flusher = new Runnable() {
4393      @Override
4394      public void run() {
4395        while (!appendDone.get()) {
4396          try {
4397            region.flush(true);
4398          } catch (Exception e) {
4399            e.printStackTrace();
4400          }
4401        }
4402      }
4403    };
4404
4405    // After all append finished, the value will append to threadNum *
4406    // appendCounter Appender.CHAR
4407    int threadNum = 20;
4408    int appendCounter = 100;
4409    byte[] expected = new byte[threadNum * appendCounter];
4410    for (int i = 0; i < threadNum * appendCounter; i++) {
4411      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4412    }
4413    Thread[] appenders = new Thread[threadNum];
4414    Thread flushThread = new Thread(flusher);
4415    for (int i = 0; i < threadNum; i++) {
4416      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4417      appenders[i].start();
4418    }
4419    flushThread.start();
4420    for (int i = 0; i < threadNum; i++) {
4421      appenders[i].join();
4422    }
4423
4424    appendDone.set(true);
4425    flushThread.join();
4426
4427    Get get = new Get(Appender.appendRow);
4428    get.addColumn(Appender.family, Appender.qualifier);
4429    get.readVersions(1);
4430    Result res = this.region.get(get);
4431    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4432
4433    // we just got the latest version
4434    assertEquals(1, kvs.size());
4435    Cell kv = kvs.get(0);
4436    byte[] appendResult = new byte[kv.getValueLength()];
4437    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4438    assertArrayEquals(expected, appendResult);
4439  }
4440
4441  /**
4442   * Test case to check put function with memstore flushing for same row, same ts
4443   * @throws Exception
4444   */
4445  @Test
4446  public void testPutWithMemStoreFlush() throws Exception {
4447    byte[] family = Bytes.toBytes("family");
4448    byte[] qualifier = Bytes.toBytes("qualifier");
4449    byte[] row = Bytes.toBytes("putRow");
4450    byte[] value = null;
4451    this.region = initHRegion(tableName, method, CONF, family);
4452    Put put = null;
4453    Get get = null;
4454    List<Cell> kvs = null;
4455    Result res = null;
4456
4457    put = new Put(row);
4458    value = Bytes.toBytes("value0");
4459    put.addColumn(family, qualifier, 1234567L, value);
4460    region.put(put);
4461    get = new Get(row);
4462    get.addColumn(family, qualifier);
4463    get.readAllVersions();
4464    res = this.region.get(get);
4465    kvs = res.getColumnCells(family, qualifier);
4466    assertEquals(1, kvs.size());
4467    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4468
4469    region.flush(true);
4470    get = new Get(row);
4471    get.addColumn(family, qualifier);
4472    get.readAllVersions();
4473    res = this.region.get(get);
4474    kvs = res.getColumnCells(family, qualifier);
4475    assertEquals(1, kvs.size());
4476    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4477
4478    put = new Put(row);
4479    value = Bytes.toBytes("value1");
4480    put.addColumn(family, qualifier, 1234567L, value);
4481    region.put(put);
4482    get = new Get(row);
4483    get.addColumn(family, qualifier);
4484    get.readAllVersions();
4485    res = this.region.get(get);
4486    kvs = res.getColumnCells(family, qualifier);
4487    assertEquals(1, kvs.size());
4488    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4489
4490    region.flush(true);
4491    get = new Get(row);
4492    get.addColumn(family, qualifier);
4493    get.readAllVersions();
4494    res = this.region.get(get);
4495    kvs = res.getColumnCells(family, qualifier);
4496    assertEquals(1, kvs.size());
4497    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4498  }
4499
4500  @Test
4501  public void testDurability() throws Exception {
4502    // there are 5 x 5 cases:
4503    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4504    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4505
4506    // expected cases for append and sync wal
4507    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4508    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4509    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4510
4511    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4512    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4513    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4514
4515    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4516    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4517
4518    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4519    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4520
4521    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4522    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4523    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4524
4525    // expected cases for async wal
4526    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4527    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4528    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4529    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4530    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4531    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4532
4533    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4534    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4535    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4536    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4537    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4538    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4539
4540    // expect skip wal cases
4541    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4542    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4543    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4544    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4545    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4546    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4547
4548  }
4549
4550  private void durabilityTest(String method, Durability tableDurability,
4551      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4552      final boolean expectSyncFromLogSyncer) throws Exception {
4553    Configuration conf = HBaseConfiguration.create(CONF);
4554    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4555    byte[] family = Bytes.toBytes("family");
4556    Path logDir = new Path(new Path(dir + method), "log");
4557    final Configuration walConf = new Configuration(conf);
4558    FSUtils.setRootDir(walConf, logDir);
4559    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4560    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4561    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4562    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4563    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4564    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4565        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4566        new byte[][] { family });
4567
4568    Put put = new Put(Bytes.toBytes("r1"));
4569    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4570    put.setDurability(mutationDurability);
4571    region.put(put);
4572
4573    // verify append called or not
4574    verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(),
4575      (WALKeyImpl) any(), (WALEdit) any());
4576
4577    // verify sync called or not
4578    if (expectSync || expectSyncFromLogSyncer) {
4579      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4580        @Override
4581        public boolean evaluate() throws Exception {
4582          try {
4583            if (expectSync) {
4584              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4585            } else if (expectSyncFromLogSyncer) {
4586              verify(wal, times(1)).sync(); // wal syncer calls this one
4587            }
4588          } catch (Throwable ignore) {
4589          }
4590          return true;
4591        }
4592      });
4593    } else {
4594      //verify(wal, never()).sync(anyLong());
4595      verify(wal, never()).sync();
4596    }
4597
4598    HBaseTestingUtility.closeRegionAndWAL(this.region);
4599    wals.close();
4600    this.region = null;
4601  }
4602
4603  @Test
4604  public void testRegionReplicaSecondary() throws IOException {
4605    // create a primary region, load some data and flush
4606    // create a secondary region, and do a get against that
4607    Path rootDir = new Path(dir + name.getMethodName());
4608    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4609
4610    byte[][] families = new byte[][] {
4611        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4612    };
4613    byte[] cq = Bytes.toBytes("cq");
4614    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4615    for (byte[] family : families) {
4616      htd.addFamily(new HColumnDescriptor(family));
4617    }
4618
4619    long time = System.currentTimeMillis();
4620    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4621      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4622      false, time, 0);
4623    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4624      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4625      false, time, 1);
4626
4627    HRegion primaryRegion = null, secondaryRegion = null;
4628
4629    try {
4630      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4631          rootDir, TEST_UTIL.getConfiguration(), htd);
4632
4633      // load some data
4634      putData(primaryRegion, 0, 1000, cq, families);
4635
4636      // flush region
4637      primaryRegion.flush(true);
4638
4639      // open secondary region
4640      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4641
4642      verifyData(secondaryRegion, 0, 1000, cq, families);
4643    } finally {
4644      if (primaryRegion != null) {
4645        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4646      }
4647      if (secondaryRegion != null) {
4648        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4649      }
4650    }
4651  }
4652
4653  @Test
4654  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4655    // create a primary region, load some data and flush
4656    // create a secondary region, and do a put against that
4657    Path rootDir = new Path(dir + name.getMethodName());
4658    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4659
4660    byte[][] families = new byte[][] {
4661        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4662    };
4663    byte[] cq = Bytes.toBytes("cq");
4664    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4665    for (byte[] family : families) {
4666      htd.addFamily(new HColumnDescriptor(family));
4667    }
4668
4669    long time = System.currentTimeMillis();
4670    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4671      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4672      false, time, 0);
4673    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4674      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4675      false, time, 1);
4676
4677    HRegion primaryRegion = null, secondaryRegion = null;
4678
4679    try {
4680      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4681          rootDir, TEST_UTIL.getConfiguration(), htd);
4682
4683      // load some data
4684      putData(primaryRegion, 0, 1000, cq, families);
4685
4686      // flush region
4687      primaryRegion.flush(true);
4688
4689      // open secondary region
4690      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4691
4692      try {
4693        putData(secondaryRegion, 0, 1000, cq, families);
4694        fail("Should have thrown exception");
4695      } catch (IOException ex) {
4696        // expected
4697      }
4698    } finally {
4699      if (primaryRegion != null) {
4700        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4701      }
4702      if (secondaryRegion != null) {
4703        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4704      }
4705    }
4706  }
4707
4708  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4709    Configuration confForWAL = new Configuration(conf);
4710    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4711    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4712  }
4713
4714  @Test
4715  public void testCompactionFromPrimary() throws IOException {
4716    Path rootDir = new Path(dir + name.getMethodName());
4717    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4718
4719    byte[][] families = new byte[][] {
4720        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4721    };
4722    byte[] cq = Bytes.toBytes("cq");
4723    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4724    for (byte[] family : families) {
4725      htd.addFamily(new HColumnDescriptor(family));
4726    }
4727
4728    long time = System.currentTimeMillis();
4729    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4730      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4731      false, time, 0);
4732    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4733      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4734      false, time, 1);
4735
4736    HRegion primaryRegion = null, secondaryRegion = null;
4737
4738    try {
4739      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4740          rootDir, TEST_UTIL.getConfiguration(), htd);
4741
4742      // load some data
4743      putData(primaryRegion, 0, 1000, cq, families);
4744
4745      // flush region
4746      primaryRegion.flush(true);
4747
4748      // open secondary region
4749      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4750
4751      // move the file of the primary region to the archive, simulating a compaction
4752      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4753      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4754      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4755          .getStoreFiles(families[0]);
4756      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4757
4758      verifyData(secondaryRegion, 0, 1000, cq, families);
4759    } finally {
4760      if (primaryRegion != null) {
4761        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4762      }
4763      if (secondaryRegion != null) {
4764        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4765      }
4766    }
4767  }
4768
4769  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4770      IOException {
4771    putData(this.region, startRow, numRows, qf, families);
4772  }
4773
4774  private void putData(HRegion region,
4775      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4776    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4777  }
4778
4779  static void putData(HRegion region, Durability durability,
4780      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4781    for (int i = startRow; i < startRow + numRows; i++) {
4782      Put put = new Put(Bytes.toBytes("" + i));
4783      put.setDurability(durability);
4784      for (byte[] family : families) {
4785        put.addColumn(family, qf, null);
4786      }
4787      region.put(put);
4788      LOG.info(put.toString());
4789    }
4790  }
4791
4792  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4793      throws IOException {
4794    for (int i = startRow; i < startRow + numRows; i++) {
4795      byte[] row = Bytes.toBytes("" + i);
4796      Get get = new Get(row);
4797      for (byte[] family : families) {
4798        get.addColumn(family, qf);
4799      }
4800      Result result = newReg.get(get);
4801      Cell[] raw = result.rawCells();
4802      assertEquals(families.length, result.size());
4803      for (int j = 0; j < families.length; j++) {
4804        assertTrue(CellUtil.matchingRows(raw[j], row));
4805        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4806        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4807      }
4808    }
4809  }
4810
4811  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4812    // Now I have k, get values out and assert they are as expected.
4813    Get get = new Get(k).addFamily(family).readAllVersions();
4814    Cell[] results = r.get(get).rawCells();
4815    for (int j = 0; j < results.length; j++) {
4816      byte[] tmp = CellUtil.cloneValue(results[j]);
4817      // Row should be equal to value every time.
4818      assertTrue(Bytes.equals(k, tmp));
4819    }
4820  }
4821
4822  /*
4823   * Assert first value in the passed region is <code>firstValue</code>.
4824   *
4825   * @param r
4826   *
4827   * @param fs
4828   *
4829   * @param firstValue
4830   *
4831   * @throws IOException
4832   */
4833  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
4834      throws IOException {
4835    byte[][] families = { fs };
4836    Scan scan = new Scan();
4837    for (int i = 0; i < families.length; i++)
4838      scan.addFamily(families[i]);
4839    InternalScanner s = r.getScanner(scan);
4840    try {
4841      List<Cell> curVals = new ArrayList<>();
4842      boolean first = true;
4843      OUTER_LOOP: while (s.next(curVals)) {
4844        for (Cell kv : curVals) {
4845          byte[] val = CellUtil.cloneValue(kv);
4846          byte[] curval = val;
4847          if (first) {
4848            first = false;
4849            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
4850          } else {
4851            // Not asserting anything. Might as well break.
4852            break OUTER_LOOP;
4853          }
4854        }
4855      }
4856    } finally {
4857      s.close();
4858    }
4859  }
4860
4861  /**
4862   * Test that we get the expected flush results back
4863   */
4864  @Test
4865  public void testFlushResult() throws IOException {
4866    byte[] family = Bytes.toBytes("family");
4867
4868    this.region = initHRegion(tableName, method, family);
4869
4870    // empty memstore, flush doesn't run
4871    HRegion.FlushResult fr = region.flush(true);
4872    assertFalse(fr.isFlushSucceeded());
4873    assertFalse(fr.isCompactionNeeded());
4874
4875    // Flush enough files to get up to the threshold, doesn't need compactions
4876    for (int i = 0; i < 2; i++) {
4877      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4878      region.put(put);
4879      fr = region.flush(true);
4880      assertTrue(fr.isFlushSucceeded());
4881      assertFalse(fr.isCompactionNeeded());
4882    }
4883
4884    // Two flushes after the threshold, compactions are needed
4885    for (int i = 0; i < 2; i++) {
4886      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4887      region.put(put);
4888      fr = region.flush(true);
4889      assertTrue(fr.isFlushSucceeded());
4890      assertTrue(fr.isCompactionNeeded());
4891    }
4892  }
4893
4894  protected Configuration initSplit() {
4895    // Always compact if there is more than one store file.
4896    CONF.setInt("hbase.hstore.compactionThreshold", 2);
4897
4898    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
4899
4900    // Increase the amount of time between client retries
4901    CONF.setLong("hbase.client.pause", 15 * 1000);
4902
4903    // This size should make it so we always split using the addContent
4904    // below. After adding all data, the first region is 1.3M
4905    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
4906    return CONF;
4907  }
4908
4909  /**
4910   * @return A region on which you must call
4911   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4912   */
4913  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4914      byte[]... families) throws IOException {
4915    return initHRegion(tableName, callingMethod, conf, false, families);
4916  }
4917
4918  /**
4919   * @return A region on which you must call
4920   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4921   */
4922  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4923      boolean isReadOnly, byte[]... families) throws IOException {
4924    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
4925  }
4926
4927  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4928      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
4929      throws IOException {
4930    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
4931    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
4932    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
4933    return initHRegion(tableName, startKey, stopKey, isReadOnly,
4934        Durability.SYNC_WAL, wal, families);
4935  }
4936
4937  /**
4938   * @return A region on which you must call
4939   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4940   */
4941  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4942      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
4943    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
4944    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
4945        isReadOnly, durability, wal, families);
4946  }
4947
4948  /**
4949   * Assert that the passed in Cell has expected contents for the specified row,
4950   * column & timestamp.
4951   */
4952  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
4953    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
4954    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
4955        Bytes.toString(CellUtil.cloneRow(kv)));
4956    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
4957        Bytes.toString(CellUtil.cloneFamily(kv)));
4958    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
4959        Bytes.toString(CellUtil.cloneQualifier(kv)));
4960    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
4961    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
4962        Bytes.toString(CellUtil.cloneValue(kv)));
4963  }
4964
4965  @Test
4966  public void testReverseScanner_FromMemStore_SingleCF_Normal()
4967      throws IOException {
4968    byte[] rowC = Bytes.toBytes("rowC");
4969    byte[] rowA = Bytes.toBytes("rowA");
4970    byte[] rowB = Bytes.toBytes("rowB");
4971    byte[] cf = Bytes.toBytes("CF");
4972    byte[][] families = { cf };
4973    byte[] col = Bytes.toBytes("C");
4974    long ts = 1;
4975    this.region = initHRegion(tableName, method, families);
4976    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4977    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4978        null);
4979    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4980    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4981    Put put = null;
4982    put = new Put(rowC);
4983    put.add(kv1);
4984    put.add(kv11);
4985    region.put(put);
4986    put = new Put(rowA);
4987    put.add(kv2);
4988    region.put(put);
4989    put = new Put(rowB);
4990    put.add(kv3);
4991    region.put(put);
4992
4993    Scan scan = new Scan(rowC);
4994    scan.setMaxVersions(5);
4995    scan.setReversed(true);
4996    InternalScanner scanner = region.getScanner(scan);
4997    List<Cell> currRow = new ArrayList<>();
4998    boolean hasNext = scanner.next(currRow);
4999    assertEquals(2, currRow.size());
5000    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5001        .get(0).getRowLength(), rowC, 0, rowC.length));
5002    assertTrue(hasNext);
5003    currRow.clear();
5004    hasNext = scanner.next(currRow);
5005    assertEquals(1, currRow.size());
5006    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5007        .get(0).getRowLength(), rowB, 0, rowB.length));
5008    assertTrue(hasNext);
5009    currRow.clear();
5010    hasNext = scanner.next(currRow);
5011    assertEquals(1, currRow.size());
5012    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5013        .get(0).getRowLength(), rowA, 0, rowA.length));
5014    assertFalse(hasNext);
5015    scanner.close();
5016  }
5017
5018  @Test
5019  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5020      throws IOException {
5021    byte[] rowC = Bytes.toBytes("rowC");
5022    byte[] rowA = Bytes.toBytes("rowA");
5023    byte[] rowB = Bytes.toBytes("rowB");
5024    byte[] rowD = Bytes.toBytes("rowD");
5025    byte[] cf = Bytes.toBytes("CF");
5026    byte[][] families = { cf };
5027    byte[] col = Bytes.toBytes("C");
5028    long ts = 1;
5029    this.region = initHRegion(tableName, method, families);
5030    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5031    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5032        null);
5033    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5034    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5035    Put put = null;
5036    put = new Put(rowC);
5037    put.add(kv1);
5038    put.add(kv11);
5039    region.put(put);
5040    put = new Put(rowA);
5041    put.add(kv2);
5042    region.put(put);
5043    put = new Put(rowB);
5044    put.add(kv3);
5045    region.put(put);
5046
5047    Scan scan = new Scan(rowD);
5048    List<Cell> currRow = new ArrayList<>();
5049    scan.setReversed(true);
5050    scan.setMaxVersions(5);
5051    InternalScanner scanner = region.getScanner(scan);
5052    boolean hasNext = scanner.next(currRow);
5053    assertEquals(2, currRow.size());
5054    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5055        .get(0).getRowLength(), rowC, 0, rowC.length));
5056    assertTrue(hasNext);
5057    currRow.clear();
5058    hasNext = scanner.next(currRow);
5059    assertEquals(1, currRow.size());
5060    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5061        .get(0).getRowLength(), rowB, 0, rowB.length));
5062    assertTrue(hasNext);
5063    currRow.clear();
5064    hasNext = scanner.next(currRow);
5065    assertEquals(1, currRow.size());
5066    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5067        .get(0).getRowLength(), rowA, 0, rowA.length));
5068    assertFalse(hasNext);
5069    scanner.close();
5070  }
5071
5072  @Test
5073  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5074      throws IOException {
5075    byte[] rowC = Bytes.toBytes("rowC");
5076    byte[] rowA = Bytes.toBytes("rowA");
5077    byte[] rowB = Bytes.toBytes("rowB");
5078    byte[] cf = Bytes.toBytes("CF");
5079    byte[][] families = { cf };
5080    byte[] col = Bytes.toBytes("C");
5081    long ts = 1;
5082    this.region = initHRegion(tableName, method, families);
5083    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5084    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5085        null);
5086    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5087    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5088    Put put = null;
5089    put = new Put(rowC);
5090    put.add(kv1);
5091    put.add(kv11);
5092    region.put(put);
5093    put = new Put(rowA);
5094    put.add(kv2);
5095    region.put(put);
5096    put = new Put(rowB);
5097    put.add(kv3);
5098    region.put(put);
5099    Scan scan = new Scan();
5100    List<Cell> currRow = new ArrayList<>();
5101    scan.setReversed(true);
5102    InternalScanner scanner = region.getScanner(scan);
5103    boolean hasNext = scanner.next(currRow);
5104    assertEquals(1, currRow.size());
5105    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5106        .get(0).getRowLength(), rowC, 0, rowC.length));
5107    assertTrue(hasNext);
5108    currRow.clear();
5109    hasNext = scanner.next(currRow);
5110    assertEquals(1, currRow.size());
5111    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5112        .get(0).getRowLength(), rowB, 0, rowB.length));
5113    assertTrue(hasNext);
5114    currRow.clear();
5115    hasNext = scanner.next(currRow);
5116    assertEquals(1, currRow.size());
5117    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5118        .get(0).getRowLength(), rowA, 0, rowA.length));
5119    assertFalse(hasNext);
5120    scanner.close();
5121  }
5122
5123  @Test
5124  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5125    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5126    byte[] rowA = Bytes.toBytes("rowA");
5127    byte[] rowB = Bytes.toBytes("rowB");
5128    byte[] rowC = Bytes.toBytes("rowC");
5129    byte[] rowD = Bytes.toBytes("rowD");
5130    byte[] rowE = Bytes.toBytes("rowE");
5131    byte[] cf = Bytes.toBytes("CF");
5132    byte[][] families = { cf };
5133    byte[] col1 = Bytes.toBytes("col1");
5134    byte[] col2 = Bytes.toBytes("col2");
5135    long ts = 1;
5136    this.region = initHRegion(tableName, method, families);
5137    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5138    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5139    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5140    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5141    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5142    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5143    Put put = null;
5144    put = new Put(rowA);
5145    put.add(kv1);
5146    region.put(put);
5147    put = new Put(rowB);
5148    put.add(kv2);
5149    region.put(put);
5150    put = new Put(rowC);
5151    put.add(kv3);
5152    region.put(put);
5153    put = new Put(rowD);
5154    put.add(kv4_1);
5155    region.put(put);
5156    put = new Put(rowD);
5157    put.add(kv4_2);
5158    region.put(put);
5159    put = new Put(rowE);
5160    put.add(kv5);
5161    region.put(put);
5162    region.flush(true);
5163    Scan scan = new Scan(rowD, rowA);
5164    scan.addColumn(families[0], col1);
5165    scan.setReversed(true);
5166    List<Cell> currRow = new ArrayList<>();
5167    InternalScanner scanner = region.getScanner(scan);
5168    boolean hasNext = scanner.next(currRow);
5169    assertEquals(1, currRow.size());
5170    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5171        .get(0).getRowLength(), rowD, 0, rowD.length));
5172    assertTrue(hasNext);
5173    currRow.clear();
5174    hasNext = scanner.next(currRow);
5175    assertEquals(1, currRow.size());
5176    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5177        .get(0).getRowLength(), rowC, 0, rowC.length));
5178    assertTrue(hasNext);
5179    currRow.clear();
5180    hasNext = scanner.next(currRow);
5181    assertEquals(1, currRow.size());
5182    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5183        .get(0).getRowLength(), rowB, 0, rowB.length));
5184    assertFalse(hasNext);
5185    scanner.close();
5186
5187    scan = new Scan(rowD, rowA);
5188    scan.addColumn(families[0], col2);
5189    scan.setReversed(true);
5190    currRow.clear();
5191    scanner = region.getScanner(scan);
5192    hasNext = scanner.next(currRow);
5193    assertEquals(1, currRow.size());
5194    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5195        .get(0).getRowLength(), rowD, 0, rowD.length));
5196    scanner.close();
5197  }
5198
5199  @Test
5200  public void testReverseScanner_smaller_blocksize() throws IOException {
5201    // case to ensure no conflict with HFile index optimization
5202    byte[] rowA = Bytes.toBytes("rowA");
5203    byte[] rowB = Bytes.toBytes("rowB");
5204    byte[] rowC = Bytes.toBytes("rowC");
5205    byte[] rowD = Bytes.toBytes("rowD");
5206    byte[] rowE = Bytes.toBytes("rowE");
5207    byte[] cf = Bytes.toBytes("CF");
5208    byte[][] families = { cf };
5209    byte[] col1 = Bytes.toBytes("col1");
5210    byte[] col2 = Bytes.toBytes("col2");
5211    long ts = 1;
5212    HBaseConfiguration config = new HBaseConfiguration();
5213    config.setInt("test.block.size", 1);
5214    this.region = initHRegion(tableName, method, config, families);
5215    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5216    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5217    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5218    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5219    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5220    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5221    Put put = null;
5222    put = new Put(rowA);
5223    put.add(kv1);
5224    region.put(put);
5225    put = new Put(rowB);
5226    put.add(kv2);
5227    region.put(put);
5228    put = new Put(rowC);
5229    put.add(kv3);
5230    region.put(put);
5231    put = new Put(rowD);
5232    put.add(kv4_1);
5233    region.put(put);
5234    put = new Put(rowD);
5235    put.add(kv4_2);
5236    region.put(put);
5237    put = new Put(rowE);
5238    put.add(kv5);
5239    region.put(put);
5240    region.flush(true);
5241    Scan scan = new Scan(rowD, rowA);
5242    scan.addColumn(families[0], col1);
5243    scan.setReversed(true);
5244    List<Cell> currRow = new ArrayList<>();
5245    InternalScanner scanner = region.getScanner(scan);
5246    boolean hasNext = scanner.next(currRow);
5247    assertEquals(1, currRow.size());
5248    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5249        .get(0).getRowLength(), rowD, 0, rowD.length));
5250    assertTrue(hasNext);
5251    currRow.clear();
5252    hasNext = scanner.next(currRow);
5253    assertEquals(1, currRow.size());
5254    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5255        .get(0).getRowLength(), rowC, 0, rowC.length));
5256    assertTrue(hasNext);
5257    currRow.clear();
5258    hasNext = scanner.next(currRow);
5259    assertEquals(1, currRow.size());
5260    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5261        .get(0).getRowLength(), rowB, 0, rowB.length));
5262    assertFalse(hasNext);
5263    scanner.close();
5264
5265    scan = new Scan(rowD, rowA);
5266    scan.addColumn(families[0], col2);
5267    scan.setReversed(true);
5268    currRow.clear();
5269    scanner = region.getScanner(scan);
5270    hasNext = scanner.next(currRow);
5271    assertEquals(1, currRow.size());
5272    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5273        .get(0).getRowLength(), rowD, 0, rowD.length));
5274    scanner.close();
5275  }
5276
5277  @Test
5278  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5279      throws IOException {
5280    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5281    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5282    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5283    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5284    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5285    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5286    byte[] cf1 = Bytes.toBytes("CF1");
5287    byte[] cf2 = Bytes.toBytes("CF2");
5288    byte[] cf3 = Bytes.toBytes("CF3");
5289    byte[][] families = { cf1, cf2, cf3 };
5290    byte[] col = Bytes.toBytes("C");
5291    long ts = 1;
5292    HBaseConfiguration conf = new HBaseConfiguration();
5293    // disable compactions in this test.
5294    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5295    this.region = initHRegion(tableName, method, conf, families);
5296    // kv naming style: kv(row number) totalKvCountInThisRow seq no
5297    KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5298        null);
5299    KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5300        null);
5301    KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5302        KeyValue.Type.Put, null);
5303    KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5304        null);
5305    KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5306        null);
5307    KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5308        null);
5309    KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5310        KeyValue.Type.Put, null);
5311    KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5312        null);
5313    KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5314        KeyValue.Type.Put, null);
5315    KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5316        null);
5317    KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5318        null);
5319    KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5320        KeyValue.Type.Put, null);
5321    KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5322        null);
5323    KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5324        KeyValue.Type.Put, null);
5325    KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5326        null);
5327    KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5328        null);
5329    // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5330    Put put = null;
5331    put = new Put(row1);
5332    put.add(kv1_2_1);
5333    region.put(put);
5334    put = new Put(row2);
5335    put.add(kv2_4_1);
5336    region.put(put);
5337    put = new Put(row4);
5338    put.add(kv4_5_4);
5339    put.add(kv4_5_5);
5340    region.put(put);
5341    region.flush(true);
5342    // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5343    put = new Put(row4);
5344    put.add(kv4_5_1);
5345    put.add(kv4_5_3);
5346    region.put(put);
5347    put = new Put(row1);
5348    put.add(kv1_2_2);
5349    region.put(put);
5350    put = new Put(row2);
5351    put.add(kv2_4_4);
5352    region.put(put);
5353    region.flush(true);
5354    // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5355    put = new Put(row4);
5356    put.add(kv4_5_2);
5357    region.put(put);
5358    put = new Put(row2);
5359    put.add(kv2_4_2);
5360    put.add(kv2_4_3);
5361    region.put(put);
5362    put = new Put(row3);
5363    put.add(kv3_2_2);
5364    region.put(put);
5365    region.flush(true);
5366    // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5367    // ( 2 kv)
5368    put = new Put(row0);
5369    put.add(kv0_1_1);
5370    region.put(put);
5371    put = new Put(row3);
5372    put.add(kv3_2_1);
5373    region.put(put);
5374    put = new Put(row5);
5375    put.add(kv5_2_1);
5376    put.add(kv5_2_2);
5377    region.put(put);
5378    // scan range = ["row4", min), skip the max "row5"
5379    Scan scan = new Scan(row4);
5380    scan.setMaxVersions(5);
5381    scan.setBatch(3);
5382    scan.setReversed(true);
5383    InternalScanner scanner = region.getScanner(scan);
5384    List<Cell> currRow = new ArrayList<>();
5385    boolean hasNext = false;
5386    // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5387    // included in scan range
5388    // "row4" takes 2 next() calls since batch=3
5389    hasNext = scanner.next(currRow);
5390    assertEquals(3, currRow.size());
5391    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5392        .get(0).getRowLength(), row4, 0, row4.length));
5393    assertTrue(hasNext);
5394    currRow.clear();
5395    hasNext = scanner.next(currRow);
5396    assertEquals(2, currRow.size());
5397    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5398        currRow.get(0).getRowLength(), row4, 0,
5399      row4.length));
5400    assertTrue(hasNext);
5401    // 2. scan out "row3" (2 kv)
5402    currRow.clear();
5403    hasNext = scanner.next(currRow);
5404    assertEquals(2, currRow.size());
5405    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5406        .get(0).getRowLength(), row3, 0, row3.length));
5407    assertTrue(hasNext);
5408    // 3. scan out "row2" (4 kvs)
5409    // "row2" takes 2 next() calls since batch=3
5410    currRow.clear();
5411    hasNext = scanner.next(currRow);
5412    assertEquals(3, currRow.size());
5413    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5414        .get(0).getRowLength(), row2, 0, row2.length));
5415    assertTrue(hasNext);
5416    currRow.clear();
5417    hasNext = scanner.next(currRow);
5418    assertEquals(1, currRow.size());
5419    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5420        .get(0).getRowLength(), row2, 0, row2.length));
5421    assertTrue(hasNext);
5422    // 4. scan out "row1" (2 kv)
5423    currRow.clear();
5424    hasNext = scanner.next(currRow);
5425    assertEquals(2, currRow.size());
5426    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5427        .get(0).getRowLength(), row1, 0, row1.length));
5428    assertTrue(hasNext);
5429    // 5. scan out "row0" (1 kv)
5430    currRow.clear();
5431    hasNext = scanner.next(currRow);
5432    assertEquals(1, currRow.size());
5433    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5434        .get(0).getRowLength(), row0, 0, row0.length));
5435    assertFalse(hasNext);
5436
5437    scanner.close();
5438  }
5439
5440  @Test
5441  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5442      throws IOException {
5443    byte[] row1 = Bytes.toBytes("row1");
5444    byte[] row2 = Bytes.toBytes("row2");
5445    byte[] row3 = Bytes.toBytes("row3");
5446    byte[] row4 = Bytes.toBytes("row4");
5447    byte[] cf1 = Bytes.toBytes("CF1");
5448    byte[] cf2 = Bytes.toBytes("CF2");
5449    byte[] cf3 = Bytes.toBytes("CF3");
5450    byte[] cf4 = Bytes.toBytes("CF4");
5451    byte[][] families = { cf1, cf2, cf3, cf4 };
5452    byte[] col = Bytes.toBytes("C");
5453    long ts = 1;
5454    HBaseConfiguration conf = new HBaseConfiguration();
5455    // disable compactions in this test.
5456    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5457    this.region = initHRegion(tableName, method, conf, families);
5458    KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5459    KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5460    KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5461    KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5462    // storefile1
5463    Put put = new Put(row1);
5464    put.add(kv1);
5465    region.put(put);
5466    region.flush(true);
5467    // storefile2
5468    put = new Put(row2);
5469    put.add(kv2);
5470    region.put(put);
5471    region.flush(true);
5472    // storefile3
5473    put = new Put(row3);
5474    put.add(kv3);
5475    region.put(put);
5476    region.flush(true);
5477    // memstore
5478    put = new Put(row4);
5479    put.add(kv4);
5480    region.put(put);
5481    // scan range = ["row4", min)
5482    Scan scan = new Scan(row4);
5483    scan.setReversed(true);
5484    scan.setBatch(10);
5485    InternalScanner scanner = region.getScanner(scan);
5486    List<Cell> currRow = new ArrayList<>();
5487    boolean hasNext = scanner.next(currRow);
5488    assertEquals(1, currRow.size());
5489    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5490        .get(0).getRowLength(), row4, 0, row4.length));
5491    assertTrue(hasNext);
5492    currRow.clear();
5493    hasNext = scanner.next(currRow);
5494    assertEquals(1, currRow.size());
5495    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5496        .get(0).getRowLength(), row3, 0, row3.length));
5497    assertTrue(hasNext);
5498    currRow.clear();
5499    hasNext = scanner.next(currRow);
5500    assertEquals(1, currRow.size());
5501    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5502        .get(0).getRowLength(), row2, 0, row2.length));
5503    assertTrue(hasNext);
5504    currRow.clear();
5505    hasNext = scanner.next(currRow);
5506    assertEquals(1, currRow.size());
5507    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5508        .get(0).getRowLength(), row1, 0, row1.length));
5509    assertFalse(hasNext);
5510  }
5511
5512  /**
5513   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5514   */
5515  @Test
5516  public void testReverseScanner_StackOverflow() throws IOException {
5517    byte[] cf1 = Bytes.toBytes("CF1");
5518    byte[][] families = {cf1};
5519    byte[] col = Bytes.toBytes("C");
5520    HBaseConfiguration conf = new HBaseConfiguration();
5521    this.region = initHRegion(tableName, method, conf, families);
5522    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5523    Put put = new Put(Bytes.toBytes("19998"));
5524    put.addColumn(cf1, col, Bytes.toBytes("val"));
5525    region.put(put);
5526    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5527    Put put2 = new Put(Bytes.toBytes("19997"));
5528    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5529    region.put(put2);
5530
5531    Scan scan = new Scan(Bytes.toBytes("19998"));
5532    scan.setReversed(true);
5533    InternalScanner scanner = region.getScanner(scan);
5534
5535    // create one storefile contains many rows will be skipped
5536    // to check StoreFileScanner.seekToPreviousRow
5537    for (int i = 10000; i < 20000; i++) {
5538      Put p = new Put(Bytes.toBytes(""+i));
5539      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5540      region.put(p);
5541    }
5542    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5543
5544    // create one memstore contains many rows will be skipped
5545    // to check MemStoreScanner.seekToPreviousRow
5546    for (int i = 10000; i < 20000; i++) {
5547      Put p = new Put(Bytes.toBytes(""+i));
5548      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5549      region.put(p);
5550    }
5551
5552    List<Cell> currRow = new ArrayList<>();
5553    boolean hasNext;
5554    do {
5555      hasNext = scanner.next(currRow);
5556    } while (hasNext);
5557    assertEquals(2, currRow.size());
5558    assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5559      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5560    assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5561      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5562  }
5563
5564  @Test
5565  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5566    byte[] cf1 = Bytes.toBytes("CF1");
5567    byte[][] families = { cf1 };
5568    byte[] col = Bytes.toBytes("C");
5569    HBaseConfiguration conf = new HBaseConfiguration();
5570    this.region = initHRegion(tableName, method, conf, families);
5571    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5572    Put put = new Put(Bytes.toBytes("19996"));
5573    put.addColumn(cf1, col, Bytes.toBytes("val"));
5574    region.put(put);
5575    Put put2 = new Put(Bytes.toBytes("19995"));
5576    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5577    region.put(put2);
5578    // create a reverse scan
5579    Scan scan = new Scan(Bytes.toBytes("19996"));
5580    scan.setReversed(true);
5581    RegionScannerImpl scanner = region.getScanner(scan);
5582
5583    // flush the cache. This will reset the store scanner
5584    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5585
5586    // create one memstore contains many rows will be skipped
5587    // to check MemStoreScanner.seekToPreviousRow
5588    for (int i = 10000; i < 20000; i++) {
5589      Put p = new Put(Bytes.toBytes("" + i));
5590      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5591      region.put(p);
5592    }
5593    List<Cell> currRow = new ArrayList<>();
5594    boolean hasNext;
5595    boolean assertDone = false;
5596    do {
5597      hasNext = scanner.next(currRow);
5598      // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5599      // added here
5600      if (!assertDone) {
5601        StoreScanner current =
5602            (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5603        List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5604        assertEquals("There should be only one scanner the store file scanner", 1,
5605          scanners.size());
5606        assertDone = true;
5607      }
5608    } while (hasNext);
5609    assertEquals(2, currRow.size());
5610    assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5611      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5612    assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5613      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5614  }
5615
5616  @Test
5617  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5618    byte[] cf1 = Bytes.toBytes("CF1");
5619    byte[][] families = { cf1 };
5620    byte[] col = Bytes.toBytes("C");
5621
5622    HBaseConfiguration conf = new HBaseConfiguration();
5623    this.region = initHRegion(tableName, method, conf, families);
5624
5625    Put put = new Put(Bytes.toBytes("199996"));
5626    put.addColumn(cf1, col, Bytes.toBytes("val"));
5627    region.put(put);
5628    Put put2 = new Put(Bytes.toBytes("199995"));
5629    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5630    region.put(put2);
5631
5632    // Create a reverse scan
5633    Scan scan = new Scan(Bytes.toBytes("199996"));
5634    scan.setReversed(true);
5635    RegionScannerImpl scanner = region.getScanner(scan);
5636
5637    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5638    for (int i = 100000; i < 200000; i++) {
5639      Put p = new Put(Bytes.toBytes("" + i));
5640      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5641      region.put(p);
5642    }
5643    List<Cell> currRow = new ArrayList<>();
5644    boolean hasNext;
5645    do {
5646      hasNext = scanner.next(currRow);
5647    } while (hasNext);
5648
5649    assertEquals(2, currRow.size());
5650    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5651      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5652    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5653      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5654  }
5655
5656  @Test
5657  public void testWriteRequestsCounter() throws IOException {
5658    byte[] fam = Bytes.toBytes("info");
5659    byte[][] families = { fam };
5660    this.region = initHRegion(tableName, method, CONF, families);
5661
5662    Assert.assertEquals(0L, region.getWriteRequestsCount());
5663
5664    Put put = new Put(row);
5665    put.addColumn(fam, fam, fam);
5666
5667    Assert.assertEquals(0L, region.getWriteRequestsCount());
5668    region.put(put);
5669    Assert.assertEquals(1L, region.getWriteRequestsCount());
5670    region.put(put);
5671    Assert.assertEquals(2L, region.getWriteRequestsCount());
5672    region.put(put);
5673    Assert.assertEquals(3L, region.getWriteRequestsCount());
5674
5675    region.delete(new Delete(row));
5676    Assert.assertEquals(4L, region.getWriteRequestsCount());
5677  }
5678
5679  @Test
5680  public void testOpenRegionWrittenToWAL() throws Exception {
5681    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5682    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5683
5684    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5685      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5686      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5687    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5688
5689    // open the region w/o rss and wal and flush some files
5690    region =
5691         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5692             .getConfiguration(), htd);
5693    assertNotNull(region);
5694
5695    // create a file in fam1 for the region before opening in OpenRegionHandler
5696    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5697    region.flush(true);
5698    HBaseTestingUtility.closeRegionAndWAL(region);
5699
5700    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5701
5702    // capture append() calls
5703    WAL wal = mockWAL();
5704    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5705
5706    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5707      TEST_UTIL.getConfiguration(), rss, null);
5708
5709    verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
5710      editCaptor.capture());
5711
5712    WALEdit edit = editCaptor.getValue();
5713    assertNotNull(edit);
5714    assertNotNull(edit.getCells());
5715    assertEquals(1, edit.getCells().size());
5716    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5717    assertNotNull(desc);
5718
5719    LOG.info("RegionEventDescriptor from WAL: " + desc);
5720
5721    assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5722    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5723    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5724      hri.getEncodedNameAsBytes()));
5725    assertTrue(desc.getLogSequenceNumber() > 0);
5726    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5727    assertEquals(2, desc.getStoresCount());
5728
5729    StoreDescriptor store = desc.getStores(0);
5730    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5731    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5732    assertEquals(1, store.getStoreFileCount()); // 1store file
5733    assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5734
5735    store = desc.getStores(1);
5736    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5737    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5738    assertEquals(0, store.getStoreFileCount()); // no store files
5739  }
5740
5741  // Helper for test testOpenRegionWrittenToWALForLogReplay
5742  static class HRegionWithSeqId extends HRegion {
5743    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5744        final Configuration confParam, final RegionInfo regionInfo,
5745        final TableDescriptor htd, final RegionServerServices rsServices) {
5746      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5747    }
5748    @Override
5749    protected long getNextSequenceId(WAL wal) throws IOException {
5750      return 42;
5751    }
5752  }
5753
5754  @Test
5755  public void testFlushedFileWithNoTags() throws Exception {
5756    final TableName tableName = TableName.valueOf(name.getMethodName());
5757    HTableDescriptor htd = new HTableDescriptor(tableName);
5758    htd.addFamily(new HColumnDescriptor(fam1));
5759    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5760    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5761    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5762    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5763    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5764    region.put(put);
5765    region.flush(true);
5766    HStore store = region.getStore(fam1);
5767    Collection<HStoreFile> storefiles = store.getStorefiles();
5768    for (HStoreFile sf : storefiles) {
5769      assertFalse("Tags should not be present "
5770          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5771    }
5772  }
5773
5774  /**
5775   * Utility method to setup a WAL mock.
5776   * <p/>
5777   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
5778   * @return a mock WAL
5779   */
5780  private WAL mockWAL() throws IOException {
5781    WAL wal = mock(WAL.class);
5782    when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
5783      .thenAnswer(new Answer<Long>() {
5784        @Override
5785        public Long answer(InvocationOnMock invocation) throws Throwable {
5786          WALKeyImpl key = invocation.getArgument(1);
5787          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5788          key.setWriteEntry(we);
5789          return 1L;
5790        }
5791      });
5792    when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))).
5793        thenAnswer(new Answer<Long>() {
5794          @Override
5795          public Long answer(InvocationOnMock invocation) throws Throwable {
5796            WALKeyImpl key = invocation.getArgument(1);
5797            MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5798            key.setWriteEntry(we);
5799            return 1L;
5800          }
5801        });
5802    return wal;
5803  }
5804
5805  @Test
5806  public void testCloseRegionWrittenToWAL() throws Exception {
5807    Path rootDir = new Path(dir + name.getMethodName());
5808    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
5809
5810    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
5811    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5812
5813    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5814      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5815      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5816    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5817
5818    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5819
5820    // capture append() calls
5821    WAL wal = mockWAL();
5822    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5823
5824
5825    // create and then open a region first so that it can be closed later
5826    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
5827    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5828      TEST_UTIL.getConfiguration(), rss, null);
5829
5830    // close the region
5831    region.close(false);
5832
5833    // 2 times, one for region open, the other close region
5834    verify(wal, times(2)).appendMarker(any(RegionInfo.class),
5835        (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture());
5836
5837    WALEdit edit = editCaptor.getAllValues().get(1);
5838    assertNotNull(edit);
5839    assertNotNull(edit.getCells());
5840    assertEquals(1, edit.getCells().size());
5841    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5842    assertNotNull(desc);
5843
5844    LOG.info("RegionEventDescriptor from WAL: " + desc);
5845
5846    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
5847    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5848    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5849      hri.getEncodedNameAsBytes()));
5850    assertTrue(desc.getLogSequenceNumber() > 0);
5851    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5852    assertEquals(2, desc.getStoresCount());
5853
5854    StoreDescriptor store = desc.getStores(0);
5855    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5856    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5857    assertEquals(0, store.getStoreFileCount()); // no store files
5858
5859    store = desc.getStores(1);
5860    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5861    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5862    assertEquals(0, store.getStoreFileCount()); // no store files
5863  }
5864
5865  /**
5866   * Test RegionTooBusyException thrown when region is busy
5867   */
5868  @Test
5869  public void testRegionTooBusy() throws IOException {
5870    byte[] family = Bytes.toBytes("family");
5871    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
5872      HRegion.DEFAULT_BUSY_WAIT_DURATION);
5873    CONF.setLong("hbase.busy.wait.duration", 1000);
5874    region = initHRegion(tableName, method, CONF, family);
5875    final AtomicBoolean stopped = new AtomicBoolean(true);
5876    Thread t = new Thread(new Runnable() {
5877      @Override
5878      public void run() {
5879        try {
5880          region.lock.writeLock().lock();
5881          stopped.set(false);
5882          while (!stopped.get()) {
5883            Thread.sleep(100);
5884          }
5885        } catch (InterruptedException ie) {
5886        } finally {
5887          region.lock.writeLock().unlock();
5888        }
5889      }
5890    });
5891    t.start();
5892    Get get = new Get(row);
5893    try {
5894      while (stopped.get()) {
5895        Thread.sleep(100);
5896      }
5897      region.get(get);
5898      fail("Should throw RegionTooBusyException");
5899    } catch (InterruptedException ie) {
5900      fail("test interrupted");
5901    } catch (RegionTooBusyException e) {
5902      // Good, expected
5903    } finally {
5904      stopped.set(true);
5905      try {
5906        t.join();
5907      } catch (Throwable e) {
5908      }
5909
5910      HBaseTestingUtility.closeRegionAndWAL(region);
5911      region = null;
5912      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
5913    }
5914  }
5915
5916  @Test
5917  public void testCellTTLs() throws IOException {
5918    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
5919    EnvironmentEdgeManager.injectEdge(edge);
5920
5921    final byte[] row = Bytes.toBytes("testRow");
5922    final byte[] q1 = Bytes.toBytes("q1");
5923    final byte[] q2 = Bytes.toBytes("q2");
5924    final byte[] q3 = Bytes.toBytes("q3");
5925    final byte[] q4 = Bytes.toBytes("q4");
5926
5927    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5928    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
5929    hcd.setTimeToLive(10); // 10 seconds
5930    htd.addFamily(hcd);
5931
5932    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
5933    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
5934
5935    region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
5936            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
5937        TEST_UTIL.getDataTestDir(), conf, htd);
5938    assertNotNull(region);
5939    long now = EnvironmentEdgeManager.currentTime();
5940    // Add a cell that will expire in 5 seconds via cell TTL
5941    region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
5942      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5943        // TTL tags specify ts in milliseconds
5944        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5945    // Add a cell that will expire after 10 seconds via family setting
5946    region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
5947    // Add a cell that will expire in 15 seconds via cell TTL
5948    region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
5949      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5950        // TTL tags specify ts in milliseconds
5951        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5952    // Add a cell that will expire in 20 seconds via family setting
5953    region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
5954
5955    // Flush so we are sure store scanning gets this right
5956    region.flush(true);
5957
5958    // A query at time T+0 should return all cells
5959    Result r = region.get(new Get(row));
5960    assertNotNull(r.getValue(fam1, q1));
5961    assertNotNull(r.getValue(fam1, q2));
5962    assertNotNull(r.getValue(fam1, q3));
5963    assertNotNull(r.getValue(fam1, q4));
5964
5965    // Increment time to T+5 seconds
5966    edge.incrementTime(5000);
5967
5968    r = region.get(new Get(row));
5969    assertNull(r.getValue(fam1, q1));
5970    assertNotNull(r.getValue(fam1, q2));
5971    assertNotNull(r.getValue(fam1, q3));
5972    assertNotNull(r.getValue(fam1, q4));
5973
5974    // Increment time to T+10 seconds
5975    edge.incrementTime(5000);
5976
5977    r = region.get(new Get(row));
5978    assertNull(r.getValue(fam1, q1));
5979    assertNull(r.getValue(fam1, q2));
5980    assertNotNull(r.getValue(fam1, q3));
5981    assertNotNull(r.getValue(fam1, q4));
5982
5983    // Increment time to T+15 seconds
5984    edge.incrementTime(5000);
5985
5986    r = region.get(new Get(row));
5987    assertNull(r.getValue(fam1, q1));
5988    assertNull(r.getValue(fam1, q2));
5989    assertNull(r.getValue(fam1, q3));
5990    assertNotNull(r.getValue(fam1, q4));
5991
5992    // Increment time to T+20 seconds
5993    edge.incrementTime(10000);
5994
5995    r = region.get(new Get(row));
5996    assertNull(r.getValue(fam1, q1));
5997    assertNull(r.getValue(fam1, q2));
5998    assertNull(r.getValue(fam1, q3));
5999    assertNull(r.getValue(fam1, q4));
6000
6001    // Fun with disappearing increments
6002
6003    // Start at 1
6004    region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
6005    r = region.get(new Get(row));
6006    byte[] val = r.getValue(fam1, q1);
6007    assertNotNull(val);
6008    assertEquals(1L, Bytes.toLong(val));
6009
6010    // Increment with a TTL of 5 seconds
6011    Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6012    incr.setTTL(5000);
6013    region.increment(incr); // 2
6014
6015    // New value should be 2
6016    r = region.get(new Get(row));
6017    val = r.getValue(fam1, q1);
6018    assertNotNull(val);
6019    assertEquals(2L, Bytes.toLong(val));
6020
6021    // Increment time to T+25 seconds
6022    edge.incrementTime(5000);
6023
6024    // Value should be back to 1
6025    r = region.get(new Get(row));
6026    val = r.getValue(fam1, q1);
6027    assertNotNull(val);
6028    assertEquals(1L, Bytes.toLong(val));
6029
6030    // Increment time to T+30 seconds
6031    edge.incrementTime(5000);
6032
6033    // Original value written at T+20 should be gone now via family TTL
6034    r = region.get(new Get(row));
6035    assertNull(r.getValue(fam1, q1));
6036  }
6037
6038  @Test
6039  public void testIncrementTimestampsAreMonotonic() throws IOException {
6040    region = initHRegion(tableName, method, CONF, fam1);
6041    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6042    EnvironmentEdgeManager.injectEdge(edge);
6043
6044    edge.setValue(10);
6045    Increment inc = new Increment(row);
6046    inc.setDurability(Durability.SKIP_WAL);
6047    inc.addColumn(fam1, qual1, 1L);
6048    region.increment(inc);
6049
6050    Result result = region.get(new Get(row));
6051    Cell c = result.getColumnLatestCell(fam1, qual1);
6052    assertNotNull(c);
6053    assertEquals(10L, c.getTimestamp());
6054
6055    edge.setValue(1); // clock goes back
6056    region.increment(inc);
6057    result = region.get(new Get(row));
6058    c = result.getColumnLatestCell(fam1, qual1);
6059    assertEquals(11L, c.getTimestamp());
6060    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6061  }
6062
6063  @Test
6064  public void testAppendTimestampsAreMonotonic() throws IOException {
6065    region = initHRegion(tableName, method, CONF, fam1);
6066    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6067    EnvironmentEdgeManager.injectEdge(edge);
6068
6069    edge.setValue(10);
6070    Append a = new Append(row);
6071    a.setDurability(Durability.SKIP_WAL);
6072    a.addColumn(fam1, qual1, qual1);
6073    region.append(a);
6074
6075    Result result = region.get(new Get(row));
6076    Cell c = result.getColumnLatestCell(fam1, qual1);
6077    assertNotNull(c);
6078    assertEquals(10L, c.getTimestamp());
6079
6080    edge.setValue(1); // clock goes back
6081    region.append(a);
6082    result = region.get(new Get(row));
6083    c = result.getColumnLatestCell(fam1, qual1);
6084    assertEquals(11L, c.getTimestamp());
6085
6086    byte[] expected = new byte[qual1.length*2];
6087    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6088    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6089
6090    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6091      expected, 0, expected.length));
6092  }
6093
6094  @Test
6095  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6096    region = initHRegion(tableName, method, CONF, fam1);
6097    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6098    EnvironmentEdgeManager.injectEdge(edge);
6099
6100    edge.setValue(10);
6101    Put p = new Put(row);
6102    p.setDurability(Durability.SKIP_WAL);
6103    p.addColumn(fam1, qual1, qual1);
6104    region.put(p);
6105
6106    Result result = region.get(new Get(row));
6107    Cell c = result.getColumnLatestCell(fam1, qual1);
6108    assertNotNull(c);
6109    assertEquals(10L, c.getTimestamp());
6110
6111    edge.setValue(1); // clock goes back
6112    p = new Put(row);
6113    p.setDurability(Durability.SKIP_WAL);
6114    p.addColumn(fam1, qual1, qual2);
6115    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6116    result = region.get(new Get(row));
6117    c = result.getColumnLatestCell(fam1, qual1);
6118    assertEquals(10L, c.getTimestamp());
6119
6120    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6121      qual2, 0, qual2.length));
6122  }
6123
6124  @Test
6125  public void testBatchMutateWithWrongRegionException() throws Exception {
6126    final byte[] a = Bytes.toBytes("a");
6127    final byte[] b = Bytes.toBytes("b");
6128    final byte[] c = Bytes.toBytes("c"); // exclusive
6129
6130    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6131    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6132    region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6133
6134    Mutation[] mutations = new Mutation[] {
6135        new Put(a)
6136            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6137              .setRow(a)
6138              .setFamily(fam1)
6139              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6140              .setType(Cell.Type.Put)
6141              .build()),
6142        // this is outside the region boundary
6143        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6144              .setRow(c)
6145              .setFamily(fam1)
6146              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6147              .setType(Type.Put)
6148              .build()),
6149        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6150              .setRow(b)
6151              .setFamily(fam1)
6152              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6153              .setType(Cell.Type.Put)
6154              .build())
6155    };
6156
6157    OperationStatus[] status = region.batchMutate(mutations);
6158    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6159    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6160    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6161
6162
6163    // test with a row lock held for a long time
6164    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6165    ExecutorService exec = Executors.newFixedThreadPool(2);
6166    Future<Void> f1 = exec.submit(new Callable<Void>() {
6167      @Override
6168      public Void call() throws Exception {
6169        LOG.info("Acquiring row lock");
6170        RowLock rl = region.getRowLock(b);
6171        obtainedRowLock.countDown();
6172        LOG.info("Waiting for 5 seconds before releasing lock");
6173        Threads.sleep(5000);
6174        LOG.info("Releasing row lock");
6175        rl.release();
6176        return null;
6177      }
6178    });
6179    obtainedRowLock.await(30, TimeUnit.SECONDS);
6180
6181    Future<Void> f2 = exec.submit(new Callable<Void>() {
6182      @Override
6183      public Void call() throws Exception {
6184        Mutation[] mutations = new Mutation[] {
6185            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6186                .setRow(a)
6187                .setFamily(fam1)
6188                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6189                .setType(Cell.Type.Put)
6190                .build()),
6191            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6192                .setRow(b)
6193                .setFamily(fam1)
6194                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6195                .setType(Cell.Type.Put)
6196                .build()),
6197        };
6198
6199        // this will wait for the row lock, and it will eventually succeed
6200        OperationStatus[] status = region.batchMutate(mutations);
6201        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6202        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6203        return null;
6204      }
6205    });
6206
6207    f1.get();
6208    f2.get();
6209
6210    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6211  }
6212
6213  @Test
6214  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6215    region = initHRegion(tableName, method, CONF, fam1);
6216    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6217    EnvironmentEdgeManager.injectEdge(edge);
6218
6219    edge.setValue(10);
6220    Put p = new Put(row);
6221    p.setDurability(Durability.SKIP_WAL);
6222    p.addColumn(fam1, qual1, qual1);
6223    region.put(p);
6224
6225    Result result = region.get(new Get(row));
6226    Cell c = result.getColumnLatestCell(fam1, qual1);
6227    assertNotNull(c);
6228    assertEquals(10L, c.getTimestamp());
6229
6230    edge.setValue(1); // clock goes back
6231    p = new Put(row);
6232    p.setDurability(Durability.SKIP_WAL);
6233    p.addColumn(fam1, qual1, qual2);
6234    RowMutations rm = new RowMutations(row);
6235    rm.add(p);
6236    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6237        new BinaryComparator(qual1), rm));
6238    result = region.get(new Get(row));
6239    c = result.getColumnLatestCell(fam1, qual1);
6240    assertEquals(10L, c.getTimestamp());
6241    LOG.info("c value " +
6242      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6243
6244    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6245      qual2, 0, qual2.length));
6246  }
6247
6248  HRegion initHRegion(TableName tableName, String callingMethod,
6249      byte[]... families) throws IOException {
6250    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6251        families);
6252  }
6253
6254  /**
6255   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6256   * @throws IOException if IO error occurred during test
6257   */
6258  @Test
6259  public void testWritesWhileRollWriter() throws IOException {
6260    int testCount = 10;
6261    int numRows = 1024;
6262    int numFamilies = 2;
6263    int numQualifiers = 2;
6264    final byte[][] families = new byte[numFamilies][];
6265    for (int i = 0; i < numFamilies; i++) {
6266      families[i] = Bytes.toBytes("family" + i);
6267    }
6268    final byte[][] qualifiers = new byte[numQualifiers][];
6269    for (int i = 0; i < numQualifiers; i++) {
6270      qualifiers[i] = Bytes.toBytes("qual" + i);
6271    }
6272
6273    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6274    this.region = initHRegion(tableName, method, CONF, families);
6275    try {
6276      List<Thread> threads = new ArrayList<>();
6277      for (int i = 0; i < numRows; i++) {
6278        final int count = i;
6279        Thread t = new Thread(new Runnable() {
6280
6281          @Override
6282          public void run() {
6283            byte[] row = Bytes.toBytes("row" + count);
6284            Put put = new Put(row);
6285            put.setDurability(Durability.SYNC_WAL);
6286            byte[] value = Bytes.toBytes(String.valueOf(count));
6287            for (byte[] family : families) {
6288              for (byte[] qualifier : qualifiers) {
6289                put.addColumn(family, qualifier, count, value);
6290              }
6291            }
6292            try {
6293              region.put(put);
6294            } catch (IOException e) {
6295              throw new RuntimeException(e);
6296            }
6297          }
6298        });
6299        threads.add(t);
6300      }
6301      for (Thread t : threads) {
6302        t.start();
6303      }
6304
6305      for (int i = 0; i < testCount; i++) {
6306        region.getWAL().rollWriter();
6307        Thread.yield();
6308      }
6309    } finally {
6310      try {
6311        HBaseTestingUtility.closeRegionAndWAL(this.region);
6312        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6313      } catch (DroppedSnapshotException dse) {
6314        // We could get this on way out because we interrupt the background flusher and it could
6315        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6316        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6317        // flush because the accounting is 'off' since original DSE.
6318      }
6319      this.region = null;
6320    }
6321  }
6322
6323  @Test
6324  public void testMutateRow_WriteRequestCount() throws Exception {
6325    byte[] row1 = Bytes.toBytes("row1");
6326    byte[] fam1 = Bytes.toBytes("fam1");
6327    byte[] qf1 = Bytes.toBytes("qualifier");
6328    byte[] val1 = Bytes.toBytes("value1");
6329
6330    RowMutations rm = new RowMutations(row1);
6331    Put put = new Put(row1);
6332    put.addColumn(fam1, qf1, val1);
6333    rm.add(put);
6334
6335    this.region = initHRegion(tableName, method, CONF, fam1);
6336    long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6337    this.region.mutateRow(rm);
6338    long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6339    Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6340  }
6341
6342  @Test
6343  public void testBulkLoadReplicationEnabled() throws IOException {
6344    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6345    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6346    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6347
6348    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6349    htd.addFamily(new HColumnDescriptor(fam1));
6350    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6351        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6352    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6353        rss, null);
6354
6355    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6356    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6357    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6358    assertTrue(plugins.contains(replicationCoprocessorClass));
6359    assertTrue(region.getCoprocessorHost().
6360        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6361  }
6362
6363  /**
6364   * The same as HRegion class, the only difference is that instantiateHStore will
6365   * create a different HStore - HStoreForTesting. [HBASE-8518]
6366   */
6367  public static class HRegionForTesting extends HRegion {
6368
6369    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6370                             final Configuration confParam, final RegionInfo regionInfo,
6371                             final TableDescriptor htd, final RegionServerServices rsServices) {
6372      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6373          wal, confParam, htd, rsServices);
6374    }
6375
6376    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6377                             Configuration confParam, TableDescriptor htd,
6378                             RegionServerServices rsServices) {
6379      super(fs, wal, confParam, htd, rsServices);
6380    }
6381
6382    /**
6383     * Create HStore instance.
6384     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6385     */
6386    @Override
6387    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
6388        throws IOException {
6389      if (family.isMobEnabled()) {
6390        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6391          throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
6392              " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
6393              " accordingly.");
6394        }
6395        return new HMobStore(this, family, this.conf, warmup);
6396      }
6397      return new HStoreForTesting(this, family, this.conf, warmup);
6398    }
6399  }
6400
6401  /**
6402   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6403   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6404   * doesn't let hstore compaction complete. In the former edition, this config is set in
6405   * HStore class inside compact method, though this is just for testing, otherwise it
6406   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6407   * config (except for testing code).
6408   */
6409  public static class HStoreForTesting extends HStore {
6410
6411    protected HStoreForTesting(final HRegion region,
6412        final ColumnFamilyDescriptor family,
6413        final Configuration confParam, boolean warmup) throws IOException {
6414      super(region, family, confParam, warmup);
6415    }
6416
6417    @Override
6418    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6419        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6420        List<Path> newFiles) throws IOException {
6421      // let compaction incomplete.
6422      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6423        LOG.warn("hbase.hstore.compaction.complete is set to false");
6424        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6425        final boolean evictOnClose =
6426            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6427        for (Path newFile : newFiles) {
6428          // Create storefile around what we wrote with a reader on it.
6429          HStoreFile sf = createStoreFileAndReader(newFile);
6430          sf.closeStoreFile(evictOnClose);
6431          sfs.add(sf);
6432        }
6433        return sfs;
6434      }
6435      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6436    }
6437  }
6438}