001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyBoolean;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.Mockito.doThrow;
035import static org.mockito.Mockito.mock;
036import static org.mockito.Mockito.never;
037import static org.mockito.Mockito.spy;
038import static org.mockito.Mockito.times;
039import static org.mockito.Mockito.verify;
040import static org.mockito.Mockito.when;
041
042import java.io.IOException;
043import java.io.InterruptedIOException;
044import java.math.BigDecimal;
045import java.nio.charset.StandardCharsets;
046import java.security.PrivilegedExceptionAction;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.List;
051import java.util.Map;
052import java.util.NavigableMap;
053import java.util.Objects;
054import java.util.TreeMap;
055import java.util.concurrent.Callable;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.ExecutorService;
058import java.util.concurrent.Executors;
059import java.util.concurrent.Future;
060import java.util.concurrent.TimeUnit;
061import java.util.concurrent.atomic.AtomicBoolean;
062import java.util.concurrent.atomic.AtomicInteger;
063import java.util.concurrent.atomic.AtomicReference;
064import org.apache.commons.lang3.RandomStringUtils;
065import org.apache.hadoop.conf.Configuration;
066import org.apache.hadoop.fs.FSDataOutputStream;
067import org.apache.hadoop.fs.FileStatus;
068import org.apache.hadoop.fs.FileSystem;
069import org.apache.hadoop.fs.Path;
070import org.apache.hadoop.hbase.ArrayBackedTag;
071import org.apache.hadoop.hbase.Cell;
072import org.apache.hadoop.hbase.Cell.Type;
073import org.apache.hadoop.hbase.CellBuilderFactory;
074import org.apache.hadoop.hbase.CellBuilderType;
075import org.apache.hadoop.hbase.CellUtil;
076import org.apache.hadoop.hbase.CompareOperator;
077import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
078import org.apache.hadoop.hbase.DroppedSnapshotException;
079import org.apache.hadoop.hbase.HBaseClassTestRule;
080import org.apache.hadoop.hbase.HBaseConfiguration;
081import org.apache.hadoop.hbase.HBaseTestingUtility;
082import org.apache.hadoop.hbase.HColumnDescriptor;
083import org.apache.hadoop.hbase.HConstants;
084import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
085import org.apache.hadoop.hbase.HDFSBlocksDistribution;
086import org.apache.hadoop.hbase.HRegionInfo;
087import org.apache.hadoop.hbase.HTableDescriptor;
088import org.apache.hadoop.hbase.KeyValue;
089import org.apache.hadoop.hbase.MiniHBaseCluster;
090import org.apache.hadoop.hbase.MultithreadedTestUtil;
091import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
092import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
093import org.apache.hadoop.hbase.NotServingRegionException;
094import org.apache.hadoop.hbase.PrivateCellUtil;
095import org.apache.hadoop.hbase.RegionTooBusyException;
096import org.apache.hadoop.hbase.ServerName;
097import org.apache.hadoop.hbase.StartMiniClusterOption;
098import org.apache.hadoop.hbase.TableName;
099import org.apache.hadoop.hbase.TagType;
100import org.apache.hadoop.hbase.Waiter;
101import org.apache.hadoop.hbase.client.Append;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
103import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
104import org.apache.hadoop.hbase.client.Delete;
105import org.apache.hadoop.hbase.client.Durability;
106import org.apache.hadoop.hbase.client.Get;
107import org.apache.hadoop.hbase.client.Increment;
108import org.apache.hadoop.hbase.client.Mutation;
109import org.apache.hadoop.hbase.client.Put;
110import org.apache.hadoop.hbase.client.RegionInfo;
111import org.apache.hadoop.hbase.client.RegionInfoBuilder;
112import org.apache.hadoop.hbase.client.Result;
113import org.apache.hadoop.hbase.client.RowMutations;
114import org.apache.hadoop.hbase.client.Scan;
115import org.apache.hadoop.hbase.client.Table;
116import org.apache.hadoop.hbase.client.TableDescriptor;
117import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
118import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
119import org.apache.hadoop.hbase.filter.BigDecimalComparator;
120import org.apache.hadoop.hbase.filter.BinaryComparator;
121import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
122import org.apache.hadoop.hbase.filter.Filter;
123import org.apache.hadoop.hbase.filter.FilterBase;
124import org.apache.hadoop.hbase.filter.FilterList;
125import org.apache.hadoop.hbase.filter.NullComparator;
126import org.apache.hadoop.hbase.filter.PrefixFilter;
127import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
128import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
129import org.apache.hadoop.hbase.filter.SubstringComparator;
130import org.apache.hadoop.hbase.filter.ValueFilter;
131import org.apache.hadoop.hbase.io.hfile.HFile;
132import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
133import org.apache.hadoop.hbase.monitoring.MonitoredTask;
134import org.apache.hadoop.hbase.monitoring.TaskMonitor;
135import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
136import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
137import org.apache.hadoop.hbase.regionserver.Region.RowLock;
138import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
140import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
141import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
142import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
143import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
144import org.apache.hadoop.hbase.security.User;
145import org.apache.hadoop.hbase.test.MetricsAssertHelper;
146import org.apache.hadoop.hbase.testclassification.LargeTests;
147import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
148import org.apache.hadoop.hbase.util.Bytes;
149import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
150import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
151import org.apache.hadoop.hbase.util.FSUtils;
152import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
153import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
154import org.apache.hadoop.hbase.util.Threads;
155import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
156import org.apache.hadoop.hbase.wal.FaultyFSLog;
157import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
158import org.apache.hadoop.hbase.wal.WAL;
159import org.apache.hadoop.hbase.wal.WALEdit;
160import org.apache.hadoop.hbase.wal.WALFactory;
161import org.apache.hadoop.hbase.wal.WALKeyImpl;
162import org.apache.hadoop.hbase.wal.WALProvider;
163import org.apache.hadoop.hbase.wal.WALProvider.Writer;
164import org.apache.hadoop.hbase.wal.WALSplitUtil;
165import org.junit.After;
166import org.junit.Assert;
167import org.junit.Before;
168import org.junit.ClassRule;
169import org.junit.Rule;
170import org.junit.Test;
171import org.junit.experimental.categories.Category;
172import org.junit.rules.ExpectedException;
173import org.junit.rules.TestName;
174import org.mockito.ArgumentCaptor;
175import org.mockito.ArgumentMatcher;
176import org.mockito.Mockito;
177import org.mockito.invocation.InvocationOnMock;
178import org.mockito.stubbing.Answer;
179import org.slf4j.Logger;
180import org.slf4j.LoggerFactory;
181
182import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
183import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
184import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
185import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
186import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
187
188import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
189import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
195
196/**
197 * Basic stand-alone testing of HRegion.  No clusters!
198 *
199 * A lot of the meta information for an HRegion now lives inside other HRegions
200 * or in the HBaseMaster, so only basic testing is possible.
201 */
202@Category({VerySlowRegionServerTests.class, LargeTests.class})
203@SuppressWarnings("deprecation")
204public class TestHRegion {
205
206  @ClassRule
207  public static final HBaseClassTestRule CLASS_RULE =
208      HBaseClassTestRule.forClass(TestHRegion.class);
209
210  // Do not spin up clusters in here. If you need to spin up a cluster, do it
211  // over in TestHRegionOnCluster.
212  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
213  @Rule
214  public TestName name = new TestName();
215  @Rule public final ExpectedException thrown = ExpectedException.none();
216
217  private static final String COLUMN_FAMILY = "MyCF";
218  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
219  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
220
221  HRegion region = null;
222  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
223  protected static HBaseTestingUtility TEST_UTIL;
224  public static Configuration CONF ;
225  private String dir;
226  private final int MAX_VERSIONS = 2;
227
228  // Test names
229  protected TableName tableName;
230  protected String method;
231  protected final byte[] qual = Bytes.toBytes("qual");
232  protected final byte[] qual1 = Bytes.toBytes("qual1");
233  protected final byte[] qual2 = Bytes.toBytes("qual2");
234  protected final byte[] qual3 = Bytes.toBytes("qual3");
235  protected final byte[] value = Bytes.toBytes("value");
236  protected final byte[] value1 = Bytes.toBytes("value1");
237  protected final byte[] value2 = Bytes.toBytes("value2");
238  protected final byte[] row = Bytes.toBytes("rowA");
239  protected final byte[] row2 = Bytes.toBytes("rowB");
240
241  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
242      .getInstance(MetricsAssertHelper.class);
243
244  @Before
245  public void setup() throws IOException {
246    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
247    CONF = TEST_UTIL.getConfiguration();
248    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
249    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
250    method = name.getMethodName();
251    tableName = TableName.valueOf(method);
252    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
253  }
254
255  @After
256  public void tearDown() throws IOException {
257    // Region may have been closed, but it is still no harm if we close it again here using HTU.
258    HBaseTestingUtility.closeRegionAndWAL(region);
259    EnvironmentEdgeManagerTestHelper.reset();
260    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
261    TEST_UTIL.cleanupTestDir();
262  }
263
264  /**
265   * Test that I can use the max flushed sequence id after the close.
266   * @throws IOException
267   */
268  @Test
269  public void testSequenceId() throws IOException {
270    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
271    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
272    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
273    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
274    HBaseTestingUtility.closeRegionAndWAL(this.region);
275    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
276    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
277    // Open region again.
278    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
279    byte [] value = Bytes.toBytes(method);
280    // Make a random put against our cf.
281    Put put = new Put(value);
282    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
283    region.put(put);
284    // No flush yet so init numbers should still be in place.
285    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
286    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
287    region.flush(true);
288    long max = region.getMaxFlushedSeqId();
289    HBaseTestingUtility.closeRegionAndWAL(this.region);
290    assertEquals(max, region.getMaxFlushedSeqId());
291    this.region = null;
292  }
293
294  /**
295   * Test for Bug 2 of HBASE-10466.
296   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
297   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
298   * flush is skipped and only the second flush takes place. However, two flushes are required in
299   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
300   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
301   * flushes for region close."
302   * @throws IOException
303   */
304  @Test
305  public void testCloseCarryingSnapshot() throws IOException {
306    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
307    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
308    // Get some random bytes.
309    byte [] value = Bytes.toBytes(method);
310    // Make a random put against our cf.
311    Put put = new Put(value);
312    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
313    // First put something in current memstore, which will be in snapshot after flusher.prepare()
314    region.put(put);
315    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
316    storeFlushCtx.prepare();
317    // Second put something in current memstore
318    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
319    region.put(put);
320    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
321    HBaseTestingUtility.closeRegionAndWAL(region);
322    assertEquals(0, region.getMemStoreDataSize());
323    region = null;
324  }
325
326  /*
327   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
328   * See HBASE-10845
329   */
330  @Test
331  public void testMemstoreSnapshotSize() throws IOException {
332    class MyFaultyFSLog extends FaultyFSLog {
333      StoreFlushContext storeFlushCtx;
334      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
335          throws IOException {
336        super(fs, rootDir, logName, conf);
337      }
338
339      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
340        this.storeFlushCtx = storeFlushCtx;
341      }
342
343      @Override
344      public void sync(long txid) throws IOException {
345        storeFlushCtx.prepare();
346        super.sync(txid);
347      }
348    }
349
350    FileSystem fs = FileSystem.get(CONF);
351    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
352    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
353    faultyLog.init();
354    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
355        COLUMN_FAMILY_BYTES);
356
357    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
358    // Get some random bytes.
359    byte [] value = Bytes.toBytes(method);
360    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
361
362    Put put = new Put(value);
363    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
364    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
365    boolean threwIOE = false;
366    try {
367      region.put(put);
368    } catch (IOException ioe) {
369      threwIOE = true;
370    } finally {
371      assertTrue("The regionserver should have thrown an exception", threwIOE);
372    }
373    MemStoreSize mss = store.getFlushableSize();
374    assertTrue("flushable size should be zero, but it is " + mss,
375        mss.getDataSize() == 0);
376  }
377
378  /**
379   * Create a WAL outside of the usual helper in
380   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
381   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
382   * {@code fs.file.impl}.
383   * @param callingMethod a unique component for the path, probably the name of the test method.
384   */
385  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
386      Configuration conf, TableName tableName) throws IOException {
387    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
388    final Configuration walConf = new Configuration(conf);
389    FSUtils.setRootDir(walConf, logDir);
390    return new WALFactory(walConf, callingMethod)
391        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
392  }
393
394  @Test
395  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
396    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
397    FileSystem fs = FileSystem.get(CONF);
398    Path rootDir = new Path(dir + testName);
399    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
400    hLog.init();
401    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
402        COLUMN_FAMILY_BYTES);
403    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
404    assertEquals(0, region.getMemStoreDataSize());
405
406    // Put one value
407    byte [] value = Bytes.toBytes(method);
408    Put put = new Put(value);
409    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
410    region.put(put);
411    long onePutSize = region.getMemStoreDataSize();
412    assertTrue(onePutSize > 0);
413
414    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
415    doThrow(new IOException())
416       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
417    region.setCoprocessorHost(mockedCPHost);
418
419    put = new Put(value);
420    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
421    try {
422      region.put(put);
423      fail("Should have failed with IOException");
424    } catch (IOException expected) {
425    }
426    long expectedSize = onePutSize * 2;
427    assertEquals("memstoreSize should be incremented",
428        expectedSize, region.getMemStoreDataSize());
429    assertEquals("flushable size should be incremented",
430        expectedSize, store.getFlushableSize().getDataSize());
431
432    region.setCoprocessorHost(null);
433  }
434
435  /**
436   * A test case of HBASE-21041
437   * @throws Exception Exception
438   */
439  @Test
440  public void testFlushAndMemstoreSizeCounting() throws Exception {
441    byte[] family = Bytes.toBytes("family");
442    this.region = initHRegion(tableName, method, CONF, family);
443    final WALFactory wals = new WALFactory(CONF, method);
444    try {
445      for (byte[] row : HBaseTestingUtility.ROWS) {
446        Put put = new Put(row);
447        put.addColumn(family, family, row);
448        region.put(put);
449      }
450      region.flush(true);
451      // After flush, data size should be zero
452      assertEquals(0, region.getMemStoreDataSize());
453      // After flush, a new active mutable segment is created, so the heap size
454      // should equal to MutableSegment.DEEP_OVERHEAD
455      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
456      // After flush, offheap should be zero
457      assertEquals(0, region.getMemStoreOffHeapSize());
458    } finally {
459      HBaseTestingUtility.closeRegionAndWAL(this.region);
460      this.region = null;
461      wals.close();
462    }
463  }
464
465  /**
466   * Test we do not lose data if we fail a flush and then close.
467   * Part of HBase-10466.  Tests the following from the issue description:
468   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
469   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
470   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
471   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
472   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
473   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
474   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
475   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
476   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
477   * if memstoreSize is not larger than 0."
478   * @throws Exception
479   */
480  @Test
481  public void testFlushSizeAccounting() throws Exception {
482    final Configuration conf = HBaseConfiguration.create(CONF);
483    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
484    // Only retry once.
485    conf.setInt("hbase.hstore.flush.retries.number", 1);
486    final User user =
487      User.createUserForTesting(conf, method, new String[]{"foo"});
488    // Inject our faulty LocalFileSystem
489    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
490    user.runAs(new PrivilegedExceptionAction<Object>() {
491      @Override
492      public Object run() throws Exception {
493        // Make sure it worked (above is sensitive to caching details in hadoop core)
494        FileSystem fs = FileSystem.get(conf);
495        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
496        FaultyFileSystem ffs = (FaultyFileSystem)fs;
497        HRegion region = null;
498        try {
499          // Initialize region
500          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
501              COLUMN_FAMILY_BYTES);
502          long size = region.getMemStoreDataSize();
503          Assert.assertEquals(0, size);
504          // Put one item into memstore.  Measure the size of one item in memstore.
505          Put p1 = new Put(row);
506          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
507          region.put(p1);
508          final long sizeOfOnePut = region.getMemStoreDataSize();
509          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
510          try {
511            LOG.info("Flushing");
512            region.flush(true);
513            Assert.fail("Didn't bubble up IOE!");
514          } catch (DroppedSnapshotException dse) {
515            // What we are expecting
516            region.closing.set(false); // this is needed for the rest of the test to work
517          }
518          // Make it so all writes succeed from here on out
519          ffs.fault.set(false);
520          // Check sizes.  Should still be the one entry.
521          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
522          // Now add two entries so that on this next flush that fails, we can see if we
523          // subtract the right amount, the snapshot size only.
524          Put p2 = new Put(row);
525          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
526          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
527          region.put(p2);
528          long expectedSize = sizeOfOnePut * 3;
529          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
530          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
531          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
532          // it
533          region.flush(true);
534          // Make sure our memory accounting is right.
535          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
536        } finally {
537          HBaseTestingUtility.closeRegionAndWAL(region);
538        }
539        return null;
540      }
541    });
542    FileSystem.closeAllForUGI(user.getUGI());
543  }
544
545  @Test
546  public void testCloseWithFailingFlush() throws Exception {
547    final Configuration conf = HBaseConfiguration.create(CONF);
548    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
549    // Only retry once.
550    conf.setInt("hbase.hstore.flush.retries.number", 1);
551    final User user =
552      User.createUserForTesting(conf, this.method, new String[]{"foo"});
553    // Inject our faulty LocalFileSystem
554    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
555    user.runAs(new PrivilegedExceptionAction<Object>() {
556      @Override
557      public Object run() throws Exception {
558        // Make sure it worked (above is sensitive to caching details in hadoop core)
559        FileSystem fs = FileSystem.get(conf);
560        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
561        FaultyFileSystem ffs = (FaultyFileSystem)fs;
562        HRegion region = null;
563        try {
564          // Initialize region
565          region = initHRegion(tableName, null, null, false,
566              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
567          long size = region.getMemStoreDataSize();
568          Assert.assertEquals(0, size);
569          // Put one item into memstore.  Measure the size of one item in memstore.
570          Put p1 = new Put(row);
571          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
572          region.put(p1);
573          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
574          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
575          StoreFlushContext storeFlushCtx =
576              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
577          storeFlushCtx.prepare();
578          // Now add two entries to the foreground memstore.
579          Put p2 = new Put(row);
580          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
581          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
582          region.put(p2);
583          // Now try close on top of a failing flush.
584          HBaseTestingUtility.closeRegionAndWAL(region);
585          region = null;
586          fail();
587        } catch (DroppedSnapshotException dse) {
588          // Expected
589          LOG.info("Expected DroppedSnapshotException");
590        } finally {
591          // Make it so all writes succeed from here on out so can close clean
592          ffs.fault.set(false);
593          HBaseTestingUtility.closeRegionAndWAL(region);
594        }
595        return null;
596      }
597    });
598    FileSystem.closeAllForUGI(user.getUGI());
599  }
600
601  @Test
602  public void testCompactionAffectedByScanners() throws Exception {
603    byte[] family = Bytes.toBytes("family");
604    this.region = initHRegion(tableName, method, CONF, family);
605
606    Put put = new Put(Bytes.toBytes("r1"));
607    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
608    region.put(put);
609    region.flush(true);
610
611    Scan scan = new Scan();
612    scan.setMaxVersions(3);
613    // open the first scanner
614    RegionScanner scanner1 = region.getScanner(scan);
615
616    Delete delete = new Delete(Bytes.toBytes("r1"));
617    region.delete(delete);
618    region.flush(true);
619
620    // open the second scanner
621    RegionScanner scanner2 = region.getScanner(scan);
622
623    List<Cell> results = new ArrayList<>();
624
625    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
626
627    // make a major compaction
628    region.compact(true);
629
630    // open the third scanner
631    RegionScanner scanner3 = region.getScanner(scan);
632
633    // get data from scanner 1, 2, 3 after major compaction
634    scanner1.next(results);
635    System.out.println(results);
636    assertEquals(1, results.size());
637
638    results.clear();
639    scanner2.next(results);
640    System.out.println(results);
641    assertEquals(0, results.size());
642
643    results.clear();
644    scanner3.next(results);
645    System.out.println(results);
646    assertEquals(0, results.size());
647  }
648
649  @Test
650  public void testToShowNPEOnRegionScannerReseek() throws Exception {
651    byte[] family = Bytes.toBytes("family");
652    this.region = initHRegion(tableName, method, CONF, family);
653
654    Put put = new Put(Bytes.toBytes("r1"));
655    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
656    region.put(put);
657    put = new Put(Bytes.toBytes("r2"));
658    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
659    region.put(put);
660    region.flush(true);
661
662    Scan scan = new Scan();
663    scan.setMaxVersions(3);
664    // open the first scanner
665    RegionScanner scanner1 = region.getScanner(scan);
666
667    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
668
669    region.compact(true);
670
671    scanner1.reseek(Bytes.toBytes("r2"));
672    List<Cell> results = new ArrayList<>();
673    scanner1.next(results);
674    Cell keyValue = results.get(0);
675    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
676    scanner1.close();
677  }
678
679  @Test
680  public void testSkipRecoveredEditsReplay() throws Exception {
681    byte[] family = Bytes.toBytes("family");
682    this.region = initHRegion(tableName, method, CONF, family);
683    final WALFactory wals = new WALFactory(CONF, method);
684    try {
685      Path regiondir = region.getRegionFileSystem().getRegionDir();
686      FileSystem fs = region.getRegionFileSystem().getFileSystem();
687      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
688
689      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
690
691      long maxSeqId = 1050;
692      long minSeqId = 1000;
693
694      for (long i = minSeqId; i <= maxSeqId; i += 10) {
695        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
696        fs.create(recoveredEdits);
697        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
698
699        long time = System.nanoTime();
700        WALEdit edit = new WALEdit();
701        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
702            .toBytes(i)));
703        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
704            HConstants.DEFAULT_CLUSTER_ID), edit));
705
706        writer.close();
707      }
708      MonitoredTask status = TaskMonitor.get().createStatus(method);
709      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
710      for (HStore store : region.getStores()) {
711        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
712      }
713      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
714      assertEquals(maxSeqId, seqId);
715      region.getMVCC().advanceTo(seqId);
716      Get get = new Get(row);
717      Result result = region.get(get);
718      for (long i = minSeqId; i <= maxSeqId; i += 10) {
719        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
720        assertEquals(1, kvs.size());
721        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
722      }
723    } finally {
724      HBaseTestingUtility.closeRegionAndWAL(this.region);
725      this.region = null;
726      wals.close();
727    }
728  }
729
730  @Test
731  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
732    byte[] family = Bytes.toBytes("family");
733    this.region = initHRegion(tableName, method, CONF, family);
734    final WALFactory wals = new WALFactory(CONF, method);
735    try {
736      Path regiondir = region.getRegionFileSystem().getRegionDir();
737      FileSystem fs = region.getRegionFileSystem().getFileSystem();
738      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
739
740      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
741
742      long maxSeqId = 1050;
743      long minSeqId = 1000;
744
745      for (long i = minSeqId; i <= maxSeqId; i += 10) {
746        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
747        fs.create(recoveredEdits);
748        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
749
750        long time = System.nanoTime();
751        WALEdit edit = new WALEdit();
752        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
753            .toBytes(i)));
754        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
755            HConstants.DEFAULT_CLUSTER_ID), edit));
756
757        writer.close();
758      }
759      long recoverSeqId = 1030;
760      MonitoredTask status = TaskMonitor.get().createStatus(method);
761      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
762      for (HStore store : region.getStores()) {
763        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
764      }
765      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
766      assertEquals(maxSeqId, seqId);
767      region.getMVCC().advanceTo(seqId);
768      Get get = new Get(row);
769      Result result = region.get(get);
770      for (long i = minSeqId; i <= maxSeqId; i += 10) {
771        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
772        if (i < recoverSeqId) {
773          assertEquals(0, kvs.size());
774        } else {
775          assertEquals(1, kvs.size());
776          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
777        }
778      }
779    } finally {
780      HBaseTestingUtility.closeRegionAndWAL(this.region);
781      this.region = null;
782      wals.close();
783    }
784  }
785
786  @Test
787  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
788    byte[] family = Bytes.toBytes("family");
789    this.region = initHRegion(tableName, method, CONF, family);
790    Path regiondir = region.getRegionFileSystem().getRegionDir();
791    FileSystem fs = region.getRegionFileSystem().getFileSystem();
792
793    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
794    for (int i = 1000; i < 1050; i += 10) {
795      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
796      FSDataOutputStream dos = fs.create(recoveredEdits);
797      dos.writeInt(i);
798      dos.close();
799    }
800    long minSeqId = 2000;
801    Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
802    FSDataOutputStream dos = fs.create(recoveredEdits);
803    dos.close();
804
805    Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
806    for (HStore store : region.getStores()) {
807      maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
808    }
809    long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
810    assertEquals(minSeqId, seqId);
811  }
812
813  @Test
814  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
815    byte[] family = Bytes.toBytes("family");
816    this.region = initHRegion(tableName, method, CONF, family);
817    final WALFactory wals = new WALFactory(CONF, method);
818    try {
819      Path regiondir = region.getRegionFileSystem().getRegionDir();
820      FileSystem fs = region.getRegionFileSystem().getFileSystem();
821      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
822      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
823
824      assertEquals(0, region.getStoreFileList(columns).size());
825
826      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
827
828      long maxSeqId = 1050;
829      long minSeqId = 1000;
830
831      for (long i = minSeqId; i <= maxSeqId; i += 10) {
832        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
833        fs.create(recoveredEdits);
834        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
835
836        long time = System.nanoTime();
837        WALEdit edit = null;
838        if (i == maxSeqId) {
839          edit = WALEdit.createCompaction(region.getRegionInfo(),
840          CompactionDescriptor.newBuilder()
841          .setTableName(ByteString.copyFrom(tableName.getName()))
842          .setFamilyName(ByteString.copyFrom(regionName))
843          .setEncodedRegionName(ByteString.copyFrom(regionName))
844          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
845          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
846          .build());
847        } else {
848          edit = new WALEdit();
849          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
850            .toBytes(i)));
851        }
852        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
853            HConstants.DEFAULT_CLUSTER_ID), edit));
854        writer.close();
855      }
856
857      long recoverSeqId = 1030;
858      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
859      MonitoredTask status = TaskMonitor.get().createStatus(method);
860      for (HStore store : region.getStores()) {
861        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
862      }
863      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
864      assertEquals(maxSeqId, seqId);
865
866      // assert that the files are flushed
867      assertEquals(1, region.getStoreFileList(columns).size());
868
869    } finally {
870      HBaseTestingUtility.closeRegionAndWAL(this.region);
871      this.region = null;
872      wals.close();
873    }
874  }
875
876  @Test
877  public void testRecoveredEditsReplayCompaction() throws Exception {
878    testRecoveredEditsReplayCompaction(false);
879    testRecoveredEditsReplayCompaction(true);
880  }
881
882  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
883    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
884    byte[] family = Bytes.toBytes("family");
885    this.region = initHRegion(tableName, method, CONF, family);
886    final WALFactory wals = new WALFactory(CONF, method);
887    try {
888      Path regiondir = region.getRegionFileSystem().getRegionDir();
889      FileSystem fs = region.getRegionFileSystem().getFileSystem();
890      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
891
892      long maxSeqId = 3;
893      long minSeqId = 0;
894
895      for (long i = minSeqId; i < maxSeqId; i++) {
896        Put put = new Put(Bytes.toBytes(i));
897        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
898        region.put(put);
899        region.flush(true);
900      }
901
902      // this will create a region with 3 files
903      assertEquals(3, region.getStore(family).getStorefilesCount());
904      List<Path> storeFiles = new ArrayList<>(3);
905      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
906        storeFiles.add(sf.getPath());
907      }
908
909      // disable compaction completion
910      CONF.setBoolean("hbase.hstore.compaction.complete", false);
911      region.compactStores();
912
913      // ensure that nothing changed
914      assertEquals(3, region.getStore(family).getStorefilesCount());
915
916      // now find the compacted file, and manually add it to the recovered edits
917      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
918      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
919      String errorMsg = "Expected to find 1 file in the region temp directory "
920          + "from the compaction, could not find any";
921      assertNotNull(errorMsg, files);
922      assertEquals(errorMsg, 1, files.length);
923      // move the file inside region dir
924      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
925          files[0].getPath());
926
927      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
928      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
929      for (int i=0; i < encodedNameAsBytes.length; i++) {
930        // Mix the byte array to have a new encodedName
931        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
932      }
933
934      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
935        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
936            storeFiles, Lists.newArrayList(newFile),
937            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
938
939      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
940          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
941
942      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
943
944      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
945      fs.create(recoveredEdits);
946      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
947
948      long time = System.nanoTime();
949
950      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
951          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
952          compactionDescriptor)));
953      writer.close();
954
955      // close the region now, and reopen again
956      region.getTableDescriptor();
957      region.getRegionInfo();
958      HBaseTestingUtility.closeRegionAndWAL(this.region);
959      try {
960        region = HRegion.openHRegion(region, null);
961      } catch (WrongRegionException wre) {
962        fail("Matching encoded region name should not have produced WrongRegionException");
963      }
964
965      // now check whether we have only one store file, the compacted one
966      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
967      for (HStoreFile sf : sfs) {
968        LOG.info(Objects.toString(sf.getPath()));
969      }
970      if (!mismatchedRegionName) {
971        assertEquals(1, region.getStore(family).getStorefilesCount());
972      }
973      files = FSUtils.listStatus(fs, tmpDir);
974      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
975
976      for (long i = minSeqId; i < maxSeqId; i++) {
977        Get get = new Get(Bytes.toBytes(i));
978        Result result = region.get(get);
979        byte[] value = result.getValue(family, Bytes.toBytes(i));
980        assertArrayEquals(Bytes.toBytes(i), value);
981      }
982    } finally {
983      HBaseTestingUtility.closeRegionAndWAL(this.region);
984      this.region = null;
985      wals.close();
986      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
987    }
988  }
989
990  @Test
991  public void testFlushMarkers() throws Exception {
992    // tests that flush markers are written to WAL and handled at recovered edits
993    byte[] family = Bytes.toBytes("family");
994    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
995    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
996    FSUtils.setRootDir(walConf, logDir);
997    final WALFactory wals = new WALFactory(walConf, method);
998    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
999
1000    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1001      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1002    try {
1003      Path regiondir = region.getRegionFileSystem().getRegionDir();
1004      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1005      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1006
1007      long maxSeqId = 3;
1008      long minSeqId = 0;
1009
1010      for (long i = minSeqId; i < maxSeqId; i++) {
1011        Put put = new Put(Bytes.toBytes(i));
1012        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1013        region.put(put);
1014        region.flush(true);
1015      }
1016
1017      // this will create a region with 3 files from flush
1018      assertEquals(3, region.getStore(family).getStorefilesCount());
1019      List<String> storeFiles = new ArrayList<>(3);
1020      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1021        storeFiles.add(sf.getPath().getName());
1022      }
1023
1024      // now verify that the flush markers are written
1025      wal.shutdown();
1026      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1027        TEST_UTIL.getConfiguration());
1028      try {
1029        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1030        long lastFlushSeqId = -1;
1031        while (true) {
1032          WAL.Entry entry = reader.next();
1033          if (entry == null) {
1034            break;
1035          }
1036          Cell cell = entry.getEdit().getCells().get(0);
1037          if (WALEdit.isMetaEditFamily(cell)) {
1038            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1039            assertNotNull(flushDesc);
1040            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1041            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1042              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1043            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1044              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1045            }
1046            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1047            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1048            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1049            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1050            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1051            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1052            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1053              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1054            } else {
1055              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1056              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1057            }
1058
1059            flushDescriptors.add(entry);
1060          }
1061        }
1062
1063        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1064
1065        // now write those markers to the recovered edits again.
1066
1067        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1068
1069        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1070        fs.create(recoveredEdits);
1071        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1072
1073        for (WAL.Entry entry : flushDescriptors) {
1074          writer.append(entry);
1075        }
1076        writer.close();
1077      } finally {
1078        if (null != reader) {
1079          try {
1080            reader.close();
1081          } catch (IOException exception) {
1082            LOG.warn("Problem closing wal: " + exception.getMessage());
1083            LOG.debug("exception details", exception);
1084          }
1085        }
1086      }
1087
1088      // close the region now, and reopen again
1089      HBaseTestingUtility.closeRegionAndWAL(this.region);
1090      region = HRegion.openHRegion(region, null);
1091
1092      // now check whether we have can read back the data from region
1093      for (long i = minSeqId; i < maxSeqId; i++) {
1094        Get get = new Get(Bytes.toBytes(i));
1095        Result result = region.get(get);
1096        byte[] value = result.getValue(family, Bytes.toBytes(i));
1097        assertArrayEquals(Bytes.toBytes(i), value);
1098      }
1099    } finally {
1100      HBaseTestingUtility.closeRegionAndWAL(this.region);
1101      this.region = null;
1102      wals.close();
1103    }
1104  }
1105
1106  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1107    volatile FlushAction[] actions;
1108    public IsFlushWALMarker(FlushAction... actions) {
1109      this.actions = actions;
1110    }
1111    @Override
1112    public boolean matches(WALEdit edit) {
1113      List<Cell> cells = edit.getCells();
1114      if (cells.isEmpty()) {
1115        return false;
1116      }
1117      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1118        FlushDescriptor desc;
1119        try {
1120          desc = WALEdit.getFlushDescriptor(cells.get(0));
1121        } catch (IOException e) {
1122          LOG.warn(e.toString(), e);
1123          return false;
1124        }
1125        if (desc != null) {
1126          for (FlushAction action : actions) {
1127            if (desc.getAction() == action) {
1128              return true;
1129            }
1130          }
1131        }
1132      }
1133      return false;
1134    }
1135    public IsFlushWALMarker set(FlushAction... actions) {
1136      this.actions = actions;
1137      return this;
1138    }
1139  }
1140
1141  @Test
1142  public void testFlushMarkersWALFail() throws Exception {
1143    // test the cases where the WAL append for flush markers fail.
1144    byte[] family = Bytes.toBytes("family");
1145
1146    // spy an actual WAL implementation to throw exception (was not able to mock)
1147    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1148
1149    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1150    FSUtils.setRootDir(walConf, logDir);
1151    // Make up a WAL that we can manipulate at append time.
1152    class FailAppendFlushMarkerWAL extends FSHLog {
1153      volatile FlushAction [] flushActions = null;
1154
1155      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1156      throws IOException {
1157        super(fs, root, logDir, conf);
1158      }
1159
1160      @Override
1161      protected Writer createWriterInstance(Path path) throws IOException {
1162        final Writer w = super.createWriterInstance(path);
1163        return new Writer() {
1164          @Override
1165          public void close() throws IOException {
1166            w.close();
1167          }
1168
1169          @Override
1170          public void sync(boolean forceSync) throws IOException {
1171            w.sync(forceSync);
1172          }
1173
1174          @Override
1175          public void append(Entry entry) throws IOException {
1176            List<Cell> cells = entry.getEdit().getCells();
1177            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1178              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1179              if (desc != null) {
1180                for (FlushAction flushAction: flushActions) {
1181                  if (desc.getAction().equals(flushAction)) {
1182                    throw new IOException("Failed to append flush marker! " + flushAction);
1183                  }
1184                }
1185              }
1186            }
1187            w.append(entry);
1188          }
1189
1190          @Override
1191          public long getLength() {
1192            return w.getLength();
1193          }
1194        };
1195      }
1196    }
1197    FailAppendFlushMarkerWAL wal =
1198      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1199        method, walConf);
1200    wal.init();
1201    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1202      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1203    int i = 0;
1204    Put put = new Put(Bytes.toBytes(i));
1205    put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1206    put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1207    region.put(put);
1208
1209    // 1. Test case where START_FLUSH throws exception
1210    wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1211
1212    // start cache flush will throw exception
1213    try {
1214      region.flush(true);
1215      fail("This should have thrown exception");
1216    } catch (DroppedSnapshotException unexpected) {
1217      // this should not be a dropped snapshot exception. Meaning that RS will not abort
1218      throw unexpected;
1219    } catch (IOException expected) {
1220      // expected
1221    }
1222    // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1223    // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1224    region.close(true);
1225    wal.close();
1226
1227    // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1228    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1229    wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1230          method, walConf);
1231    wal.init();
1232    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1233      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1234    region.put(put);
1235    // 3. Test case where ABORT_FLUSH will throw exception.
1236    // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1237    // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
1238    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1239
1240    try {
1241      region.flush(true);
1242      fail("This should have thrown exception");
1243    } catch (DroppedSnapshotException expected) {
1244      // we expect this exception, since we were able to write the snapshot, but failed to
1245      // write the flush marker to WAL
1246    } catch (IOException unexpected) {
1247      throw unexpected;
1248    }
1249  }
1250
1251  @Test
1252  public void testGetWhileRegionClose() throws IOException {
1253    Configuration hc = initSplit();
1254    int numRows = 100;
1255    byte[][] families = { fam1, fam2, fam3 };
1256
1257    // Setting up region
1258    this.region = initHRegion(tableName, method, hc, families);
1259    // Put data in region
1260    final int startRow = 100;
1261    putData(startRow, numRows, qual1, families);
1262    putData(startRow, numRows, qual2, families);
1263    putData(startRow, numRows, qual3, families);
1264    final AtomicBoolean done = new AtomicBoolean(false);
1265    final AtomicInteger gets = new AtomicInteger(0);
1266    GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1267    try {
1268      // Set ten threads running concurrently getting from the region.
1269      for (int i = 0; i < threads.length / 2; i++) {
1270        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1271        threads[i].setDaemon(true);
1272        threads[i].start();
1273      }
1274      // Artificially make the condition by setting closing flag explicitly.
1275      // I can't make the issue happen with a call to region.close().
1276      this.region.closing.set(true);
1277      for (int i = threads.length / 2; i < threads.length; i++) {
1278        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1279        threads[i].setDaemon(true);
1280        threads[i].start();
1281      }
1282    } finally {
1283      if (this.region != null) {
1284        HBaseTestingUtility.closeRegionAndWAL(this.region);
1285        this.region = null;
1286      }
1287    }
1288    done.set(true);
1289    for (GetTillDoneOrException t : threads) {
1290      try {
1291        t.join();
1292      } catch (InterruptedException e) {
1293        e.printStackTrace();
1294      }
1295      if (t.e != null) {
1296        LOG.info("Exception=" + t.e);
1297        assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1298      }
1299    }
1300  }
1301
1302  /*
1303   * Thread that does get on single row until 'done' flag is flipped. If an
1304   * exception causes us to fail, it records it.
1305   */
1306  class GetTillDoneOrException extends Thread {
1307    private final Get g;
1308    private final AtomicBoolean done;
1309    private final AtomicInteger count;
1310    private Exception e;
1311
1312    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1313        final AtomicInteger c) {
1314      super("getter." + i);
1315      this.g = new Get(r);
1316      this.done = d;
1317      this.count = c;
1318    }
1319
1320    @Override
1321    public void run() {
1322      while (!this.done.get()) {
1323        try {
1324          assertTrue(region.get(g).size() > 0);
1325          this.count.incrementAndGet();
1326        } catch (Exception e) {
1327          this.e = e;
1328          break;
1329        }
1330      }
1331    }
1332  }
1333
1334  /*
1335   * An involved filter test. Has multiple column families and deletes in mix.
1336   */
1337  @Test
1338  public void testWeirdCacheBehaviour() throws Exception {
1339    final TableName tableName = TableName.valueOf(name.getMethodName());
1340    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1341        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1342    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1343    String value = "this is the value";
1344    String value2 = "this is some other value";
1345    String keyPrefix1 = "prefix1";
1346    String keyPrefix2 = "prefix2";
1347    String keyPrefix3 = "prefix3";
1348    putRows(this.region, 3, value, keyPrefix1);
1349    putRows(this.region, 3, value, keyPrefix2);
1350    putRows(this.region, 3, value, keyPrefix3);
1351    putRows(this.region, 3, value2, keyPrefix1);
1352    putRows(this.region, 3, value2, keyPrefix2);
1353    putRows(this.region, 3, value2, keyPrefix3);
1354    System.out.println("Checking values for key: " + keyPrefix1);
1355    assertEquals("Got back incorrect number of rows from scan", 3,
1356        getNumberOfRows(keyPrefix1, value2, this.region));
1357    System.out.println("Checking values for key: " + keyPrefix2);
1358    assertEquals("Got back incorrect number of rows from scan", 3,
1359        getNumberOfRows(keyPrefix2, value2, this.region));
1360    System.out.println("Checking values for key: " + keyPrefix3);
1361    assertEquals("Got back incorrect number of rows from scan", 3,
1362        getNumberOfRows(keyPrefix3, value2, this.region));
1363    deleteColumns(this.region, value2, keyPrefix1);
1364    deleteColumns(this.region, value2, keyPrefix2);
1365    deleteColumns(this.region, value2, keyPrefix3);
1366    System.out.println("Starting important checks.....");
1367    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1368        getNumberOfRows(keyPrefix1, value2, this.region));
1369    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1370        getNumberOfRows(keyPrefix2, value2, this.region));
1371    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1372        getNumberOfRows(keyPrefix3, value2, this.region));
1373  }
1374
1375  @Test
1376  public void testAppendWithReadOnlyTable() throws Exception {
1377    final TableName tableName = TableName.valueOf(name.getMethodName());
1378    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1379    boolean exceptionCaught = false;
1380    Append append = new Append(Bytes.toBytes("somerow"));
1381    append.setDurability(Durability.SKIP_WAL);
1382    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1383        Bytes.toBytes("somevalue"));
1384    try {
1385      region.append(append);
1386    } catch (IOException e) {
1387      exceptionCaught = true;
1388    }
1389    assertTrue(exceptionCaught == true);
1390  }
1391
1392  @Test
1393  public void testIncrWithReadOnlyTable() throws Exception {
1394    final TableName tableName = TableName.valueOf(name.getMethodName());
1395    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1396    boolean exceptionCaught = false;
1397    Increment inc = new Increment(Bytes.toBytes("somerow"));
1398    inc.setDurability(Durability.SKIP_WAL);
1399    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1400    try {
1401      region.increment(inc);
1402    } catch (IOException e) {
1403      exceptionCaught = true;
1404    }
1405    assertTrue(exceptionCaught == true);
1406  }
1407
1408  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1409    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1410    int count = 0;
1411    boolean more = false;
1412    List<Cell> results = new ArrayList<>();
1413    do {
1414      more = scanner.next(results);
1415      if (results != null && !results.isEmpty())
1416        count++;
1417      else
1418        break;
1419      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1420      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1421      r.delete(delete);
1422      results.clear();
1423    } while (more);
1424    assertEquals("Did not perform correct number of deletes", 3, count);
1425  }
1426
1427  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1428    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1429    int numberOfResults = 0;
1430    List<Cell> results = new ArrayList<>();
1431    boolean more = false;
1432    do {
1433      more = resultScanner.next(results);
1434      if (results != null && !results.isEmpty())
1435        numberOfResults++;
1436      else
1437        break;
1438      for (Cell kv : results) {
1439        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1440      }
1441      results.clear();
1442    } while (more);
1443    return numberOfResults;
1444  }
1445
1446  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1447      throws IOException {
1448    // Defaults FilterList.Operator.MUST_PASS_ALL.
1449    FilterList allFilters = new FilterList();
1450    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1451    // Only return rows where this column value exists in the row.
1452    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1453        Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes.toBytes(value));
1454    filter.setFilterIfMissing(true);
1455    allFilters.addFilter(filter);
1456    Scan scan = new Scan();
1457    scan.addFamily(Bytes.toBytes("trans-blob"));
1458    scan.addFamily(Bytes.toBytes("trans-type"));
1459    scan.addFamily(Bytes.toBytes("trans-date"));
1460    scan.addFamily(Bytes.toBytes("trans-tags"));
1461    scan.addFamily(Bytes.toBytes("trans-group"));
1462    scan.setFilter(allFilters);
1463    return r.getScanner(scan);
1464  }
1465
1466  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1467    for (int i = 0; i < numRows; i++) {
1468      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1469      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1470      Put put = new Put(Bytes.toBytes(row));
1471      put.setDurability(Durability.SKIP_WAL);
1472      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1473      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1474      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1475      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1476      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1477      r.put(put);
1478    }
1479  }
1480
1481  @Test
1482  public void testFamilyWithAndWithoutColon() throws Exception {
1483    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1484    this.region = initHRegion(tableName, method, CONF, cf);
1485    Put p = new Put(tableName.toBytes());
1486    byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1487    p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1488    boolean exception = false;
1489    try {
1490      this.region.put(p);
1491    } catch (NoSuchColumnFamilyException e) {
1492      exception = true;
1493    }
1494    assertTrue(exception);
1495  }
1496
1497  @Test
1498  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1499    final Put[] puts = new Put[10];
1500    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1501    long syncs = prepareRegionForBachPut(puts, source, false);
1502
1503    OperationStatus[] codes = this.region.batchMutate(puts);
1504    assertEquals(10, codes.length);
1505    for (int i = 0; i < 10; i++) {
1506      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1507    }
1508    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1509
1510    LOG.info("Next a batch put with one invalid family");
1511    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1512    codes = this.region.batchMutate(puts);
1513    assertEquals(10, codes.length);
1514    for (int i = 0; i < 10; i++) {
1515      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1516          codes[i].getOperationStatusCode());
1517    }
1518
1519    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1520  }
1521
1522  @Test
1523  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1524    final Put[] puts = new Put[10];
1525    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1526    long syncs = prepareRegionForBachPut(puts, source, false);
1527
1528    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1529
1530    LOG.info("batchPut will have to break into four batches to avoid row locks");
1531    RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1532    RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1533    RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1534    RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1535
1536    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1537    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1538    final CountDownLatch startingPuts = new CountDownLatch(1);
1539    final CountDownLatch startingClose = new CountDownLatch(1);
1540    TestThread putter = new TestThread(ctx) {
1541      @Override
1542      public void doWork() throws IOException {
1543        startingPuts.countDown();
1544        retFromThread.set(region.batchMutate(puts));
1545      }
1546    };
1547    LOG.info("...starting put thread while holding locks");
1548    ctx.addThread(putter);
1549    ctx.startThreads();
1550
1551    // Now attempt to close the region from another thread.  Prior to HBASE-12565
1552    // this would cause the in-progress batchMutate operation to to fail with
1553    // exception because it use to release and re-acquire the close-guard lock
1554    // between batches.  Caller then didn't get status indicating which writes succeeded.
1555    // We now expect this thread to block until the batchMutate call finishes.
1556    Thread regionCloseThread = new TestThread(ctx) {
1557      @Override
1558      public void doWork() {
1559        try {
1560          startingPuts.await();
1561          // Give some time for the batch mutate to get in.
1562          // We don't want to race with the mutate
1563          Thread.sleep(10);
1564          startingClose.countDown();
1565          HBaseTestingUtility.closeRegionAndWAL(region);
1566          region = null;
1567        } catch (IOException e) {
1568          throw new RuntimeException(e);
1569        } catch (InterruptedException e) {
1570          throw new RuntimeException(e);
1571        }
1572      }
1573    };
1574    regionCloseThread.start();
1575
1576    startingClose.await();
1577    startingPuts.await();
1578    Thread.sleep(100);
1579    LOG.info("...releasing row lock 1, which should let put thread continue");
1580    rowLock1.release();
1581    rowLock2.release();
1582    rowLock3.release();
1583    waitForCounter(source, "syncTimeNumOps", syncs + 1);
1584
1585    LOG.info("...joining on put thread");
1586    ctx.stop();
1587    regionCloseThread.join();
1588
1589    OperationStatus[] codes = retFromThread.get();
1590    for (int i = 0; i < codes.length; i++) {
1591      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1592          codes[i].getOperationStatusCode());
1593    }
1594    rowLock4.release();
1595  }
1596
1597  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1598      throws InterruptedException {
1599    long startWait = System.currentTimeMillis();
1600    long currentCount;
1601    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1602      Thread.sleep(100);
1603      if (System.currentTimeMillis() - startWait > 10000) {
1604        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1605            expectedCount, currentCount));
1606      }
1607    }
1608  }
1609
1610  @Test
1611  public void testAtomicBatchPut() throws IOException {
1612    final Put[] puts = new Put[10];
1613    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1614    long syncs = prepareRegionForBachPut(puts, source, false);
1615
1616    // 1. Straight forward case, should succeed
1617    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1618        HConstants.NO_NONCE, HConstants.NO_NONCE);
1619    OperationStatus[] codes = this.region.batchMutate(batchOp);
1620    assertEquals(10, codes.length);
1621    for (int i = 0; i < 10; i++) {
1622      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1623    }
1624    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1625
1626    // 2. Failed to get lock
1627    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1628    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1629    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1630    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1631    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1632    final CountDownLatch finishedPuts = new CountDownLatch(1);
1633    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1634        HConstants
1635        .NO_NONCE,
1636        HConstants.NO_NONCE);
1637    TestThread putter = new TestThread(ctx) {
1638      @Override
1639      public void doWork() throws IOException {
1640        try {
1641          region.batchMutate(finalBatchOp);
1642        } catch (IOException ioe) {
1643          LOG.error("test failed!", ioe);
1644          retFromThread.set(ioe);
1645        }
1646        finishedPuts.countDown();
1647      }
1648    };
1649    LOG.info("...starting put thread while holding locks");
1650    ctx.addThread(putter);
1651    ctx.startThreads();
1652    LOG.info("...waiting for batch puts while holding locks");
1653    try {
1654      finishedPuts.await();
1655    } catch (InterruptedException e) {
1656      LOG.error("Interrupted!", e);
1657    } finally {
1658      if (lock != null) {
1659        lock.release();
1660      }
1661    }
1662    assertNotNull(retFromThread.get());
1663    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1664
1665    // 3. Exception thrown in validation
1666    LOG.info("Next a batch put with one invalid family");
1667    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1668    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1669        HConstants.NO_NONCE);
1670    thrown.expect(NoSuchColumnFamilyException.class);
1671    this.region.batchMutate(batchOp);
1672  }
1673
1674  @Test
1675  public void testBatchPutWithTsSlop() throws Exception {
1676    // add data with a timestamp that is too recent for range. Ensure assert
1677    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1678    final Put[] puts = new Put[10];
1679    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1680
1681    long syncs = prepareRegionForBachPut(puts, source, true);
1682
1683    OperationStatus[] codes = this.region.batchMutate(puts);
1684    assertEquals(10, codes.length);
1685    for (int i = 0; i < 10; i++) {
1686      assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1687    }
1688    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1689  }
1690
1691  /**
1692   * @return syncs initial syncTimeNumOps
1693   */
1694  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1695      boolean slop) throws IOException {
1696    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1697
1698    LOG.info("First a batch put with all valid puts");
1699    for (int i = 0; i < puts.length; i++) {
1700      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1701          new Put(Bytes.toBytes("row_" + i));
1702      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1703    }
1704
1705    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1706    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1707    return syncs;
1708  }
1709
1710  // ////////////////////////////////////////////////////////////////////////////
1711  // checkAndMutate tests
1712  // ////////////////////////////////////////////////////////////////////////////
1713  @Test
1714  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1715    byte[] row1 = Bytes.toBytes("row1");
1716    byte[] fam1 = Bytes.toBytes("fam1");
1717    byte[] qf1 = Bytes.toBytes("qualifier");
1718    byte[] emptyVal = new byte[] {};
1719    byte[] val1 = Bytes.toBytes("value1");
1720    byte[] val2 = Bytes.toBytes("value2");
1721
1722    // Setting up region
1723    this.region = initHRegion(tableName, method, CONF, fam1);
1724    // Putting empty data in key
1725    Put put = new Put(row1);
1726    put.addColumn(fam1, qf1, emptyVal);
1727
1728    // checkAndPut with empty value
1729    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1730        new BinaryComparator(emptyVal), put);
1731    assertTrue(res);
1732
1733    // Putting data in key
1734    put = new Put(row1);
1735    put.addColumn(fam1, qf1, val1);
1736
1737    // checkAndPut with correct value
1738    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1739        new BinaryComparator(emptyVal), put);
1740    assertTrue(res);
1741
1742    // not empty anymore
1743    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1744        new BinaryComparator(emptyVal), put);
1745    assertFalse(res);
1746
1747    Delete delete = new Delete(row1);
1748    delete.addColumn(fam1, qf1);
1749    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1750        new BinaryComparator(emptyVal), delete);
1751    assertFalse(res);
1752
1753    put = new Put(row1);
1754    put.addColumn(fam1, qf1, val2);
1755    // checkAndPut with correct value
1756    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1757        new BinaryComparator(val1), put);
1758    assertTrue(res);
1759
1760    // checkAndDelete with correct value
1761    delete = new Delete(row1);
1762    delete.addColumn(fam1, qf1);
1763    delete.addColumn(fam1, qf1);
1764    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1765        new BinaryComparator(val2), delete);
1766    assertTrue(res);
1767
1768    delete = new Delete(row1);
1769    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1770        new BinaryComparator(emptyVal), delete);
1771    assertTrue(res);
1772
1773    // checkAndPut looking for a null value
1774    put = new Put(row1);
1775    put.addColumn(fam1, qf1, val1);
1776
1777    res = region
1778        .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1779    assertTrue(res);
1780  }
1781
1782  @Test
1783  public void testCheckAndMutate_WithWrongValue() throws IOException {
1784    byte[] row1 = Bytes.toBytes("row1");
1785    byte[] fam1 = Bytes.toBytes("fam1");
1786    byte[] qf1 = Bytes.toBytes("qualifier");
1787    byte[] val1 = Bytes.toBytes("value1");
1788    byte[] val2 = Bytes.toBytes("value2");
1789    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1790    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1791
1792    // Setting up region
1793    this.region = initHRegion(tableName, method, CONF, fam1);
1794    // Putting data in key
1795    Put put = new Put(row1);
1796    put.addColumn(fam1, qf1, val1);
1797    region.put(put);
1798
1799    // checkAndPut with wrong value
1800    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1801        new BinaryComparator(val2), put);
1802    assertEquals(false, res);
1803
1804    // checkAndDelete with wrong value
1805    Delete delete = new Delete(row1);
1806    delete.addFamily(fam1);
1807    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1808        new BinaryComparator(val2), put);
1809    assertEquals(false, res);
1810
1811    // Putting data in key
1812    put = new Put(row1);
1813    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1814    region.put(put);
1815
1816    // checkAndPut with wrong value
1817    res =
1818        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1819            new BigDecimalComparator(bd2), put);
1820    assertEquals(false, res);
1821
1822    // checkAndDelete with wrong value
1823    delete = new Delete(row1);
1824    delete.addFamily(fam1);
1825    res =
1826        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1827            new BigDecimalComparator(bd2), put);
1828    assertEquals(false, res);
1829  }
1830
1831  @Test
1832  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1833    byte[] row1 = Bytes.toBytes("row1");
1834    byte[] fam1 = Bytes.toBytes("fam1");
1835    byte[] qf1 = Bytes.toBytes("qualifier");
1836    byte[] val1 = Bytes.toBytes("value1");
1837    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1838
1839    // Setting up region
1840    this.region = initHRegion(tableName, method, CONF, fam1);
1841    // Putting data in key
1842    Put put = new Put(row1);
1843    put.addColumn(fam1, qf1, val1);
1844    region.put(put);
1845
1846    // checkAndPut with correct value
1847    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1848        new BinaryComparator(val1), put);
1849    assertEquals(true, res);
1850
1851    // checkAndDelete with correct value
1852    Delete delete = new Delete(row1);
1853    delete.addColumn(fam1, qf1);
1854    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1855        delete);
1856    assertEquals(true, res);
1857
1858    // Putting data in key
1859    put = new Put(row1);
1860    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1861    region.put(put);
1862
1863    // checkAndPut with correct value
1864    res =
1865        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1866            bd1), put);
1867    assertEquals(true, res);
1868
1869    // checkAndDelete with correct value
1870    delete = new Delete(row1);
1871    delete.addColumn(fam1, qf1);
1872    res =
1873        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1874            bd1), delete);
1875    assertEquals(true, res);
1876  }
1877
1878  @Test
1879  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1880    byte[] row1 = Bytes.toBytes("row1");
1881    byte[] fam1 = Bytes.toBytes("fam1");
1882    byte[] qf1 = Bytes.toBytes("qualifier");
1883    byte[] val1 = Bytes.toBytes("value1");
1884    byte[] val2 = Bytes.toBytes("value2");
1885    byte[] val3 = Bytes.toBytes("value3");
1886    byte[] val4 = Bytes.toBytes("value4");
1887
1888    // Setting up region
1889    this.region = initHRegion(tableName, method, CONF, fam1);
1890    // Putting val3 in key
1891    Put put = new Put(row1);
1892    put.addColumn(fam1, qf1, val3);
1893    region.put(put);
1894
1895    // Test CompareOp.LESS: original = val3, compare with val3, fail
1896    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1897        new BinaryComparator(val3), put);
1898    assertEquals(false, res);
1899
1900    // Test CompareOp.LESS: original = val3, compare with val4, fail
1901    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1902        new BinaryComparator(val4), put);
1903    assertEquals(false, res);
1904
1905    // Test CompareOp.LESS: original = val3, compare with val2,
1906    // succeed (now value = val2)
1907    put = new Put(row1);
1908    put.addColumn(fam1, qf1, val2);
1909    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1910        new BinaryComparator(val2), put);
1911    assertEquals(true, res);
1912
1913    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1914    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1915        new BinaryComparator(val3), put);
1916    assertEquals(false, res);
1917
1918    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1919    // succeed (value still = val2)
1920    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1921        new BinaryComparator(val2), put);
1922    assertEquals(true, res);
1923
1924    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1925    // succeed (now value = val3)
1926    put = new Put(row1);
1927    put.addColumn(fam1, qf1, val3);
1928    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1929        new BinaryComparator(val1), put);
1930    assertEquals(true, res);
1931
1932    // Test CompareOp.GREATER: original = val3, compare with val3, fail
1933    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1934        new BinaryComparator(val3), put);
1935    assertEquals(false, res);
1936
1937    // Test CompareOp.GREATER: original = val3, compare with val2, fail
1938    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1939        new BinaryComparator(val2), put);
1940    assertEquals(false, res);
1941
1942    // Test CompareOp.GREATER: original = val3, compare with val4,
1943    // succeed (now value = val2)
1944    put = new Put(row1);
1945    put.addColumn(fam1, qf1, val2);
1946    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1947        new BinaryComparator(val4), put);
1948    assertEquals(true, res);
1949
1950    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
1951    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
1952        new BinaryComparator(val1), put);
1953    assertEquals(false, res);
1954
1955    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
1956    // succeed (value still = val2)
1957    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
1958        new BinaryComparator(val2), put);
1959    assertEquals(true, res);
1960
1961    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
1962    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
1963        new BinaryComparator(val3), put);
1964    assertEquals(true, res);
1965  }
1966
1967  @Test
1968  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
1969    byte[] row1 = Bytes.toBytes("row1");
1970    byte[] fam1 = Bytes.toBytes("fam1");
1971    byte[] fam2 = Bytes.toBytes("fam2");
1972    byte[] qf1 = Bytes.toBytes("qualifier");
1973    byte[] val1 = Bytes.toBytes("value1");
1974    byte[] val2 = Bytes.toBytes("value2");
1975
1976    byte[][] families = { fam1, fam2 };
1977
1978    // Setting up region
1979    this.region = initHRegion(tableName, method, CONF, families);
1980    // Putting data in the key to check
1981    Put put = new Put(row1);
1982    put.addColumn(fam1, qf1, val1);
1983    region.put(put);
1984
1985    // Creating put to add
1986    long ts = System.currentTimeMillis();
1987    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
1988    put = new Put(row1);
1989    put.add(kv);
1990
1991    // checkAndPut with wrong value
1992    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1993        new BinaryComparator(val1), put);
1994    assertEquals(true, res);
1995
1996    Get get = new Get(row1);
1997    get.addColumn(fam2, qf1);
1998    Cell[] actual = region.get(get).rawCells();
1999
2000    Cell[] expected = { kv };
2001
2002    assertEquals(expected.length, actual.length);
2003    for (int i = 0; i < actual.length; i++) {
2004      assertEquals(expected[i], actual[i]);
2005    }
2006  }
2007
2008  @Test
2009  public void testCheckAndPut_wrongRowInPut() throws IOException {
2010    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2011    Put put = new Put(row2);
2012    put.addColumn(fam1, qual1, value1);
2013    try {
2014      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2015          new BinaryComparator(value2), put);
2016      fail();
2017    } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2018      // expected exception.
2019    }
2020  }
2021
2022  @Test
2023  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2024    byte[] row1 = Bytes.toBytes("row1");
2025    byte[] fam1 = Bytes.toBytes("fam1");
2026    byte[] fam2 = Bytes.toBytes("fam2");
2027    byte[] qf1 = Bytes.toBytes("qualifier1");
2028    byte[] qf2 = Bytes.toBytes("qualifier2");
2029    byte[] qf3 = Bytes.toBytes("qualifier3");
2030    byte[] val1 = Bytes.toBytes("value1");
2031    byte[] val2 = Bytes.toBytes("value2");
2032    byte[] val3 = Bytes.toBytes("value3");
2033    byte[] emptyVal = new byte[] {};
2034
2035    byte[][] families = { fam1, fam2 };
2036
2037    // Setting up region
2038    this.region = initHRegion(tableName, method, CONF, families);
2039    // Put content
2040    Put put = new Put(row1);
2041    put.addColumn(fam1, qf1, val1);
2042    region.put(put);
2043    Threads.sleep(2);
2044
2045    put = new Put(row1);
2046    put.addColumn(fam1, qf1, val2);
2047    put.addColumn(fam2, qf1, val3);
2048    put.addColumn(fam2, qf2, val2);
2049    put.addColumn(fam2, qf3, val1);
2050    put.addColumn(fam1, qf3, val1);
2051    region.put(put);
2052
2053    // Multi-column delete
2054    Delete delete = new Delete(row1);
2055    delete.addColumn(fam1, qf1);
2056    delete.addColumn(fam2, qf1);
2057    delete.addColumn(fam1, qf3);
2058    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2059        new BinaryComparator(val2), delete);
2060    assertEquals(true, res);
2061
2062    Get get = new Get(row1);
2063    get.addColumn(fam1, qf1);
2064    get.addColumn(fam1, qf3);
2065    get.addColumn(fam2, qf2);
2066    Result r = region.get(get);
2067    assertEquals(2, r.size());
2068    assertArrayEquals(val1, r.getValue(fam1, qf1));
2069    assertArrayEquals(val2, r.getValue(fam2, qf2));
2070
2071    // Family delete
2072    delete = new Delete(row1);
2073    delete.addFamily(fam2);
2074    res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL,
2075        new BinaryComparator(emptyVal), delete);
2076    assertEquals(true, res);
2077
2078    get = new Get(row1);
2079    r = region.get(get);
2080    assertEquals(1, r.size());
2081    assertArrayEquals(val1, r.getValue(fam1, qf1));
2082
2083    // Row delete
2084    delete = new Delete(row1);
2085    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2086        delete);
2087    assertEquals(true, res);
2088    get = new Get(row1);
2089    r = region.get(get);
2090    assertEquals(0, r.size());
2091  }
2092
2093  // ////////////////////////////////////////////////////////////////////////////
2094  // Delete tests
2095  // ////////////////////////////////////////////////////////////////////////////
2096  @Test
2097  public void testDelete_multiDeleteColumn() throws IOException {
2098    byte[] row1 = Bytes.toBytes("row1");
2099    byte[] fam1 = Bytes.toBytes("fam1");
2100    byte[] qual = Bytes.toBytes("qualifier");
2101    byte[] value = Bytes.toBytes("value");
2102
2103    Put put = new Put(row1);
2104    put.addColumn(fam1, qual, 1, value);
2105    put.addColumn(fam1, qual, 2, value);
2106
2107    this.region = initHRegion(tableName, method, CONF, fam1);
2108    region.put(put);
2109
2110    // We do support deleting more than 1 'latest' version
2111    Delete delete = new Delete(row1);
2112    delete.addColumn(fam1, qual);
2113    delete.addColumn(fam1, qual);
2114    region.delete(delete);
2115
2116    Get get = new Get(row1);
2117    get.addFamily(fam1);
2118    Result r = region.get(get);
2119    assertEquals(0, r.size());
2120  }
2121
2122  @Test
2123  public void testDelete_CheckFamily() throws IOException {
2124    byte[] row1 = Bytes.toBytes("row1");
2125    byte[] fam1 = Bytes.toBytes("fam1");
2126    byte[] fam2 = Bytes.toBytes("fam2");
2127    byte[] fam3 = Bytes.toBytes("fam3");
2128    byte[] fam4 = Bytes.toBytes("fam4");
2129
2130    // Setting up region
2131    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2132    List<Cell> kvs = new ArrayList<>();
2133    kvs.add(new KeyValue(row1, fam4, null, null));
2134
2135    // testing existing family
2136    byte[] family = fam2;
2137    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2138    deleteMap.put(family, kvs);
2139    region.delete(deleteMap, Durability.SYNC_WAL);
2140
2141    // testing non existing family
2142    boolean ok = false;
2143    family = fam4;
2144    try {
2145      deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2146      deleteMap.put(family, kvs);
2147      region.delete(deleteMap, Durability.SYNC_WAL);
2148    } catch (Exception e) {
2149      ok = true;
2150    }
2151    assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2152  }
2153
2154  @Test
2155  public void testDelete_mixed() throws IOException, InterruptedException {
2156    byte[] fam = Bytes.toBytes("info");
2157    byte[][] families = { fam };
2158    this.region = initHRegion(tableName, method, CONF, families);
2159    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2160
2161    byte[] row = Bytes.toBytes("table_name");
2162    // column names
2163    byte[] serverinfo = Bytes.toBytes("serverinfo");
2164    byte[] splitA = Bytes.toBytes("splitA");
2165    byte[] splitB = Bytes.toBytes("splitB");
2166
2167    // add some data:
2168    Put put = new Put(row);
2169    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2170    region.put(put);
2171
2172    put = new Put(row);
2173    put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2174    region.put(put);
2175
2176    put = new Put(row);
2177    put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2178    region.put(put);
2179
2180    // ok now delete a split:
2181    Delete delete = new Delete(row);
2182    delete.addColumns(fam, splitA);
2183    region.delete(delete);
2184
2185    // assert some things:
2186    Get get = new Get(row).addColumn(fam, serverinfo);
2187    Result result = region.get(get);
2188    assertEquals(1, result.size());
2189
2190    get = new Get(row).addColumn(fam, splitA);
2191    result = region.get(get);
2192    assertEquals(0, result.size());
2193
2194    get = new Get(row).addColumn(fam, splitB);
2195    result = region.get(get);
2196    assertEquals(1, result.size());
2197
2198    // Assert that after a delete, I can put.
2199    put = new Put(row);
2200    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2201    region.put(put);
2202    get = new Get(row);
2203    result = region.get(get);
2204    assertEquals(3, result.size());
2205
2206    // Now delete all... then test I can add stuff back
2207    delete = new Delete(row);
2208    region.delete(delete);
2209    assertEquals(0, region.get(get).size());
2210
2211    region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2212    result = region.get(get);
2213    assertEquals(1, result.size());
2214  }
2215
2216  @Test
2217  public void testDeleteRowWithFutureTs() throws IOException {
2218    byte[] fam = Bytes.toBytes("info");
2219    byte[][] families = { fam };
2220    this.region = initHRegion(tableName, method, CONF, families);
2221    byte[] row = Bytes.toBytes("table_name");
2222    // column names
2223    byte[] serverinfo = Bytes.toBytes("serverinfo");
2224
2225    // add data in the far future
2226    Put put = new Put(row);
2227    put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2228    region.put(put);
2229
2230    // now delete something in the present
2231    Delete delete = new Delete(row);
2232    region.delete(delete);
2233
2234    // make sure we still see our data
2235    Get get = new Get(row).addColumn(fam, serverinfo);
2236    Result result = region.get(get);
2237    assertEquals(1, result.size());
2238
2239    // delete the future row
2240    delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2241    region.delete(delete);
2242
2243    // make sure it is gone
2244    get = new Get(row).addColumn(fam, serverinfo);
2245    result = region.get(get);
2246    assertEquals(0, result.size());
2247  }
2248
2249  /**
2250   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2251   * the actual timestamp
2252   */
2253  @Test
2254  public void testPutWithLatestTS() throws IOException {
2255    byte[] fam = Bytes.toBytes("info");
2256    byte[][] families = { fam };
2257    this.region = initHRegion(tableName, method, CONF, families);
2258    byte[] row = Bytes.toBytes("row1");
2259    // column names
2260    byte[] qual = Bytes.toBytes("qual");
2261
2262    // add data with LATEST_TIMESTAMP, put without WAL
2263    Put put = new Put(row);
2264    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2265    region.put(put);
2266
2267    // Make sure it shows up with an actual timestamp
2268    Get get = new Get(row).addColumn(fam, qual);
2269    Result result = region.get(get);
2270    assertEquals(1, result.size());
2271    Cell kv = result.rawCells()[0];
2272    LOG.info("Got: " + kv);
2273    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2274        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2275
2276    // Check same with WAL enabled (historically these took different
2277    // code paths, so check both)
2278    row = Bytes.toBytes("row2");
2279    put = new Put(row);
2280    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2281    region.put(put);
2282
2283    // Make sure it shows up with an actual timestamp
2284    get = new Get(row).addColumn(fam, qual);
2285    result = region.get(get);
2286    assertEquals(1, result.size());
2287    kv = result.rawCells()[0];
2288    LOG.info("Got: " + kv);
2289    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2290        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2291  }
2292
2293  /**
2294   * Tests that there is server-side filtering for invalid timestamp upper
2295   * bound. Note that the timestamp lower bound is automatically handled for us
2296   * by the TTL field.
2297   */
2298  @Test
2299  public void testPutWithTsSlop() throws IOException {
2300    byte[] fam = Bytes.toBytes("info");
2301    byte[][] families = { fam };
2302
2303    // add data with a timestamp that is too recent for range. Ensure assert
2304    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2305    this.region = initHRegion(tableName, method, CONF, families);
2306    boolean caughtExcep = false;
2307    try {
2308      // no TS specified == use latest. should not error
2309      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2310      // TS out of range. should error
2311      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2312          System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2313      fail("Expected IOE for TS out of configured timerange");
2314    } catch (FailedSanityCheckException ioe) {
2315      LOG.debug("Received expected exception", ioe);
2316      caughtExcep = true;
2317    }
2318    assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2319  }
2320
2321  @Test
2322  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2323    byte[] fam1 = Bytes.toBytes("columnA");
2324    byte[] fam2 = Bytes.toBytes("columnB");
2325    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2326    byte[] rowA = Bytes.toBytes("rowA");
2327    byte[] rowB = Bytes.toBytes("rowB");
2328
2329    byte[] value = Bytes.toBytes("value");
2330
2331    Delete delete = new Delete(rowA);
2332    delete.addFamily(fam1);
2333
2334    region.delete(delete);
2335
2336    // now create data.
2337    Put put = new Put(rowA);
2338    put.addColumn(fam2, null, value);
2339    region.put(put);
2340
2341    put = new Put(rowB);
2342    put.addColumn(fam1, null, value);
2343    put.addColumn(fam2, null, value);
2344    region.put(put);
2345
2346    Scan scan = new Scan();
2347    scan.addFamily(fam1).addFamily(fam2);
2348    InternalScanner s = region.getScanner(scan);
2349    List<Cell> results = new ArrayList<>();
2350    s.next(results);
2351    assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2352
2353    results.clear();
2354    s.next(results);
2355    assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2356  }
2357
2358  @Test
2359  public void testDataInMemoryWithoutWAL() throws IOException {
2360    FileSystem fs = FileSystem.get(CONF);
2361    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2362    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2363    hLog.init();
2364    // This chunk creation is done throughout the code base. Do we want to move it into core?
2365    // It is missing from this test. W/o it we NPE.
2366    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2367        COLUMN_FAMILY_BYTES);
2368
2369    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2370      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2371    final long originalSize = originalCell.getSerializedSize();
2372
2373    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2374      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2375    final long addSize = addCell.getSerializedSize();
2376
2377    LOG.info("originalSize:" + originalSize
2378      + ", addSize:" + addSize);
2379    // start test. We expect that the addPut's durability will be replaced
2380    // by originalPut's durability.
2381
2382    // case 1:
2383    testDataInMemoryWithoutWAL(region,
2384            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2385            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2386            originalSize + addSize);
2387
2388    // case 2:
2389    testDataInMemoryWithoutWAL(region,
2390            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2391            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2392            originalSize + addSize);
2393
2394    // case 3:
2395    testDataInMemoryWithoutWAL(region,
2396            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2397            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2398            0);
2399
2400    // case 4:
2401    testDataInMemoryWithoutWAL(region,
2402            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2403            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2404            0);
2405  }
2406
2407  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2408          final Put addPut, long delta) throws IOException {
2409    final long initSize = region.getDataInMemoryWithoutWAL();
2410    // save normalCPHost and replaced by mockedCPHost
2411    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2412    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2413    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2414    // do below format (from Mockito doc).
2415    Mockito.doAnswer(new Answer<Void>() {
2416      @Override
2417      public Void answer(InvocationOnMock invocation) throws Throwable {
2418        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2419        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2420        return null;
2421      }
2422    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2423    ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
2424        newBuilder(COLUMN_FAMILY_BYTES);
2425    ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
2426        Long.MAX_VALUE, region.getCellComparator());
2427    Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
2428        Mockito.any())).thenReturn(info);
2429    Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
2430        Mockito.any())).thenAnswer(i -> i.getArgument(1));
2431    region.setCoprocessorHost(mockedCPHost);
2432
2433    region.put(originalPut);
2434    region.setCoprocessorHost(normalCPHost);
2435    final long finalSize = region.getDataInMemoryWithoutWAL();
2436    assertEquals("finalSize:" + finalSize + ", initSize:"
2437      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2438  }
2439
2440  @Test
2441  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2442    Delete delete = new Delete(row);
2443    delete.addColumns(fam1, qual1);
2444    doTestDelete_AndPostInsert(delete);
2445  }
2446
2447  @Test
2448  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2449    Delete delete = new Delete(row);
2450    delete.addFamily(fam1);
2451    doTestDelete_AndPostInsert(delete);
2452  }
2453
2454  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2455    this.region = initHRegion(tableName, method, CONF, fam1);
2456    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2457    Put put = new Put(row);
2458    put.addColumn(fam1, qual1, value1);
2459    region.put(put);
2460
2461    // now delete the value:
2462    region.delete(delete);
2463
2464    // ok put data:
2465    put = new Put(row);
2466    put.addColumn(fam1, qual1, value2);
2467    region.put(put);
2468
2469    // ok get:
2470    Get get = new Get(row);
2471    get.addColumn(fam1, qual1);
2472
2473    Result r = region.get(get);
2474    assertEquals(1, r.size());
2475    assertArrayEquals(value2, r.getValue(fam1, qual1));
2476
2477    // next:
2478    Scan scan = new Scan(row);
2479    scan.addColumn(fam1, qual1);
2480    InternalScanner s = region.getScanner(scan);
2481
2482    List<Cell> results = new ArrayList<>();
2483    assertEquals(false, s.next(results));
2484    assertEquals(1, results.size());
2485    Cell kv = results.get(0);
2486
2487    assertArrayEquals(value2, CellUtil.cloneValue(kv));
2488    assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2489    assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2490    assertArrayEquals(row, CellUtil.cloneRow(kv));
2491  }
2492
2493  @Test
2494  public void testDelete_CheckTimestampUpdated() throws IOException {
2495    byte[] row1 = Bytes.toBytes("row1");
2496    byte[] col1 = Bytes.toBytes("col1");
2497    byte[] col2 = Bytes.toBytes("col2");
2498    byte[] col3 = Bytes.toBytes("col3");
2499
2500    // Setting up region
2501    this.region = initHRegion(tableName, method, CONF, fam1);
2502    // Building checkerList
2503    List<Cell> kvs = new ArrayList<>();
2504    kvs.add(new KeyValue(row1, fam1, col1, null));
2505    kvs.add(new KeyValue(row1, fam1, col2, null));
2506    kvs.add(new KeyValue(row1, fam1, col3, null));
2507
2508    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2509    deleteMap.put(fam1, kvs);
2510    region.delete(deleteMap, Durability.SYNC_WAL);
2511
2512    // extract the key values out the memstore:
2513    // This is kinda hacky, but better than nothing...
2514    long now = System.currentTimeMillis();
2515    AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2516    Cell firstCell = memstore.getActive().first();
2517    assertTrue(firstCell.getTimestamp() <= now);
2518    now = firstCell.getTimestamp();
2519    for (Cell cell : memstore.getActive().getCellSet()) {
2520      assertTrue(cell.getTimestamp() <= now);
2521      now = cell.getTimestamp();
2522    }
2523  }
2524
2525  // ////////////////////////////////////////////////////////////////////////////
2526  // Get tests
2527  // ////////////////////////////////////////////////////////////////////////////
2528  @Test
2529  public void testGet_FamilyChecker() throws IOException {
2530    byte[] row1 = Bytes.toBytes("row1");
2531    byte[] fam1 = Bytes.toBytes("fam1");
2532    byte[] fam2 = Bytes.toBytes("False");
2533    byte[] col1 = Bytes.toBytes("col1");
2534
2535    // Setting up region
2536    this.region = initHRegion(tableName, method, CONF, fam1);
2537    Get get = new Get(row1);
2538    get.addColumn(fam2, col1);
2539
2540    // Test
2541    try {
2542      region.get(get);
2543      fail("Expecting DoNotRetryIOException in get but did not get any");
2544    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2545      LOG.info("Got expected DoNotRetryIOException successfully");
2546    }
2547  }
2548
2549  @Test
2550  public void testGet_Basic() throws IOException {
2551    byte[] row1 = Bytes.toBytes("row1");
2552    byte[] fam1 = Bytes.toBytes("fam1");
2553    byte[] col1 = Bytes.toBytes("col1");
2554    byte[] col2 = Bytes.toBytes("col2");
2555    byte[] col3 = Bytes.toBytes("col3");
2556    byte[] col4 = Bytes.toBytes("col4");
2557    byte[] col5 = Bytes.toBytes("col5");
2558
2559    // Setting up region
2560    this.region = initHRegion(tableName, method, CONF, fam1);
2561    // Add to memstore
2562    Put put = new Put(row1);
2563    put.addColumn(fam1, col1, null);
2564    put.addColumn(fam1, col2, null);
2565    put.addColumn(fam1, col3, null);
2566    put.addColumn(fam1, col4, null);
2567    put.addColumn(fam1, col5, null);
2568    region.put(put);
2569
2570    Get get = new Get(row1);
2571    get.addColumn(fam1, col2);
2572    get.addColumn(fam1, col4);
2573    // Expected result
2574    KeyValue kv1 = new KeyValue(row1, fam1, col2);
2575    KeyValue kv2 = new KeyValue(row1, fam1, col4);
2576    KeyValue[] expected = { kv1, kv2 };
2577
2578    // Test
2579    Result res = region.get(get);
2580    assertEquals(expected.length, res.size());
2581    for (int i = 0; i < res.size(); i++) {
2582      assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2583      assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2584      assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2585    }
2586
2587    // Test using a filter on a Get
2588    Get g = new Get(row1);
2589    final int count = 2;
2590    g.setFilter(new ColumnCountGetFilter(count));
2591    res = region.get(g);
2592    assertEquals(count, res.size());
2593  }
2594
2595  @Test
2596  public void testGet_Empty() throws IOException {
2597    byte[] row = Bytes.toBytes("row");
2598    byte[] fam = Bytes.toBytes("fam");
2599
2600    this.region = initHRegion(tableName, method, CONF, fam);
2601    Get get = new Get(row);
2602    get.addFamily(fam);
2603    Result r = region.get(get);
2604
2605    assertTrue(r.isEmpty());
2606  }
2607
2608  @Test
2609  public void testGetWithFilter() throws IOException, InterruptedException {
2610    byte[] row1 = Bytes.toBytes("row1");
2611    byte[] fam1 = Bytes.toBytes("fam1");
2612    byte[] col1 = Bytes.toBytes("col1");
2613    byte[] value1 = Bytes.toBytes("value1");
2614    byte[] value2 = Bytes.toBytes("value2");
2615
2616    final int maxVersions = 3;
2617    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2618    hcd.setMaxVersions(maxVersions);
2619    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2620    htd.addFamily(hcd);
2621    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2622    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2623    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2624    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2625    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2626
2627    // Put 4 version to memstore
2628    long ts = 0;
2629    Put put = new Put(row1, ts);
2630    put.addColumn(fam1, col1, value1);
2631    region.put(put);
2632    put = new Put(row1, ts + 1);
2633    put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2634    region.put(put);
2635    put = new Put(row1, ts + 2);
2636    put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2637    region.put(put);
2638    put = new Put(row1, ts + 3);
2639    put.addColumn(fam1, col1, value2);
2640    region.put(put);
2641
2642    Get get = new Get(row1);
2643    get.readAllVersions();
2644    Result res = region.get(get);
2645    // Get 3 versions, the oldest version has gone from user view
2646    assertEquals(maxVersions, res.size());
2647
2648    get.setFilter(new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("value")));
2649    res = region.get(get);
2650    // When use value filter, the oldest version should still gone from user view and it
2651    // should only return one key vaule
2652    assertEquals(1, res.size());
2653    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2654    assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2655
2656    region.flush(true);
2657    region.compact(true);
2658    Thread.sleep(1000);
2659    res = region.get(get);
2660    // After flush and compact, the result should be consistent with previous result
2661    assertEquals(1, res.size());
2662    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2663  }
2664
2665  // ////////////////////////////////////////////////////////////////////////////
2666  // Scanner tests
2667  // ////////////////////////////////////////////////////////////////////////////
2668  @Test
2669  public void testGetScanner_WithOkFamilies() throws IOException {
2670    byte[] fam1 = Bytes.toBytes("fam1");
2671    byte[] fam2 = Bytes.toBytes("fam2");
2672
2673    byte[][] families = { fam1, fam2 };
2674
2675    // Setting up region
2676    this.region = initHRegion(tableName, method, CONF, families);
2677    Scan scan = new Scan();
2678    scan.addFamily(fam1);
2679    scan.addFamily(fam2);
2680    try {
2681      region.getScanner(scan);
2682    } catch (Exception e) {
2683      assertTrue("Families could not be found in Region", false);
2684    }
2685  }
2686
2687  @Test
2688  public void testGetScanner_WithNotOkFamilies() throws IOException {
2689    byte[] fam1 = Bytes.toBytes("fam1");
2690    byte[] fam2 = Bytes.toBytes("fam2");
2691
2692    byte[][] families = { fam1 };
2693
2694    // Setting up region
2695    this.region = initHRegion(tableName, method, CONF, families);
2696    Scan scan = new Scan();
2697    scan.addFamily(fam2);
2698    boolean ok = false;
2699    try {
2700      region.getScanner(scan);
2701    } catch (Exception e) {
2702      ok = true;
2703    }
2704    assertTrue("Families could not be found in Region", ok);
2705  }
2706
2707  @Test
2708  public void testGetScanner_WithNoFamilies() throws IOException {
2709    byte[] row1 = Bytes.toBytes("row1");
2710    byte[] fam1 = Bytes.toBytes("fam1");
2711    byte[] fam2 = Bytes.toBytes("fam2");
2712    byte[] fam3 = Bytes.toBytes("fam3");
2713    byte[] fam4 = Bytes.toBytes("fam4");
2714
2715    byte[][] families = { fam1, fam2, fam3, fam4 };
2716
2717    // Setting up region
2718    this.region = initHRegion(tableName, method, CONF, families);
2719    // Putting data in Region
2720    Put put = new Put(row1);
2721    put.addColumn(fam1, null, null);
2722    put.addColumn(fam2, null, null);
2723    put.addColumn(fam3, null, null);
2724    put.addColumn(fam4, null, null);
2725    region.put(put);
2726
2727    Scan scan = null;
2728    HRegion.RegionScannerImpl is = null;
2729
2730    // Testing to see how many scanners that is produced by getScanner,
2731    // starting
2732    // with known number, 2 - current = 1
2733    scan = new Scan();
2734    scan.addFamily(fam2);
2735    scan.addFamily(fam4);
2736    is = region.getScanner(scan);
2737    assertEquals(1, is.storeHeap.getHeap().size());
2738
2739    scan = new Scan();
2740    is = region.getScanner(scan);
2741    assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2742  }
2743
2744  /**
2745   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2746   *
2747   * @throws IOException
2748   */
2749  @Test
2750  public void testGetScanner_WithRegionClosed() throws IOException {
2751    byte[] fam1 = Bytes.toBytes("fam1");
2752    byte[] fam2 = Bytes.toBytes("fam2");
2753
2754    byte[][] families = { fam1, fam2 };
2755
2756    // Setting up region
2757    try {
2758      this.region = initHRegion(tableName, method, CONF, families);
2759    } catch (IOException e) {
2760      e.printStackTrace();
2761      fail("Got IOException during initHRegion, " + e.getMessage());
2762    }
2763    region.closed.set(true);
2764    try {
2765      region.getScanner(null);
2766      fail("Expected to get an exception during getScanner on a region that is closed");
2767    } catch (NotServingRegionException e) {
2768      // this is the correct exception that is expected
2769    } catch (IOException e) {
2770      fail("Got wrong type of exception - should be a NotServingRegionException, " +
2771          "but was an IOException: "
2772          + e.getMessage());
2773    }
2774  }
2775
2776  @Test
2777  public void testRegionScanner_Next() throws IOException {
2778    byte[] row1 = Bytes.toBytes("row1");
2779    byte[] row2 = Bytes.toBytes("row2");
2780    byte[] fam1 = Bytes.toBytes("fam1");
2781    byte[] fam2 = Bytes.toBytes("fam2");
2782    byte[] fam3 = Bytes.toBytes("fam3");
2783    byte[] fam4 = Bytes.toBytes("fam4");
2784
2785    byte[][] families = { fam1, fam2, fam3, fam4 };
2786    long ts = System.currentTimeMillis();
2787
2788    // Setting up region
2789    this.region = initHRegion(tableName, method, CONF, families);
2790    // Putting data in Region
2791    Put put = null;
2792    put = new Put(row1);
2793    put.addColumn(fam1, (byte[]) null, ts, null);
2794    put.addColumn(fam2, (byte[]) null, ts, null);
2795    put.addColumn(fam3, (byte[]) null, ts, null);
2796    put.addColumn(fam4, (byte[]) null, ts, null);
2797    region.put(put);
2798
2799    put = new Put(row2);
2800    put.addColumn(fam1, (byte[]) null, ts, null);
2801    put.addColumn(fam2, (byte[]) null, ts, null);
2802    put.addColumn(fam3, (byte[]) null, ts, null);
2803    put.addColumn(fam4, (byte[]) null, ts, null);
2804    region.put(put);
2805
2806    Scan scan = new Scan();
2807    scan.addFamily(fam2);
2808    scan.addFamily(fam4);
2809    InternalScanner is = region.getScanner(scan);
2810
2811    List<Cell> res = null;
2812
2813    // Result 1
2814    List<Cell> expected1 = new ArrayList<>();
2815    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2816    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2817
2818    res = new ArrayList<>();
2819    is.next(res);
2820    for (int i = 0; i < res.size(); i++) {
2821      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2822    }
2823
2824    // Result 2
2825    List<Cell> expected2 = new ArrayList<>();
2826    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2827    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2828
2829    res = new ArrayList<>();
2830    is.next(res);
2831    for (int i = 0; i < res.size(); i++) {
2832      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2833    }
2834  }
2835
2836  @Test
2837  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2838    byte[] row1 = Bytes.toBytes("row1");
2839    byte[] qf1 = Bytes.toBytes("qualifier1");
2840    byte[] qf2 = Bytes.toBytes("qualifier2");
2841    byte[] fam1 = Bytes.toBytes("fam1");
2842    byte[][] families = { fam1 };
2843
2844    long ts1 = System.currentTimeMillis();
2845    long ts2 = ts1 + 1;
2846    long ts3 = ts1 + 2;
2847
2848    // Setting up region
2849    this.region = initHRegion(tableName, method, CONF, families);
2850    // Putting data in Region
2851    Put put = null;
2852    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2853    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2854    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2855
2856    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2857    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2858    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2859
2860    put = new Put(row1);
2861    put.add(kv13);
2862    put.add(kv12);
2863    put.add(kv11);
2864    put.add(kv23);
2865    put.add(kv22);
2866    put.add(kv21);
2867    region.put(put);
2868
2869    // Expected
2870    List<Cell> expected = new ArrayList<>();
2871    expected.add(kv13);
2872    expected.add(kv12);
2873
2874    Scan scan = new Scan(row1);
2875    scan.addColumn(fam1, qf1);
2876    scan.setMaxVersions(MAX_VERSIONS);
2877    List<Cell> actual = new ArrayList<>();
2878    InternalScanner scanner = region.getScanner(scan);
2879
2880    boolean hasNext = scanner.next(actual);
2881    assertEquals(false, hasNext);
2882
2883    // Verify result
2884    for (int i = 0; i < expected.size(); i++) {
2885      assertEquals(expected.get(i), actual.get(i));
2886    }
2887  }
2888
2889  @Test
2890  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2891    byte[] row1 = Bytes.toBytes("row1");
2892    byte[] qf1 = Bytes.toBytes("qualifier1");
2893    byte[] qf2 = Bytes.toBytes("qualifier2");
2894    byte[] fam1 = Bytes.toBytes("fam1");
2895    byte[][] families = { fam1 };
2896
2897    long ts1 = 1; // System.currentTimeMillis();
2898    long ts2 = ts1 + 1;
2899    long ts3 = ts1 + 2;
2900
2901    // Setting up region
2902    this.region = initHRegion(tableName, method, CONF, families);
2903    // Putting data in Region
2904    Put put = null;
2905    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2906    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2907    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2908
2909    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2910    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2911    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2912
2913    put = new Put(row1);
2914    put.add(kv13);
2915    put.add(kv12);
2916    put.add(kv11);
2917    put.add(kv23);
2918    put.add(kv22);
2919    put.add(kv21);
2920    region.put(put);
2921    region.flush(true);
2922
2923    // Expected
2924    List<Cell> expected = new ArrayList<>();
2925    expected.add(kv13);
2926    expected.add(kv12);
2927    expected.add(kv23);
2928    expected.add(kv22);
2929
2930    Scan scan = new Scan(row1);
2931    scan.addColumn(fam1, qf1);
2932    scan.addColumn(fam1, qf2);
2933    scan.setMaxVersions(MAX_VERSIONS);
2934    List<Cell> actual = new ArrayList<>();
2935    InternalScanner scanner = region.getScanner(scan);
2936
2937    boolean hasNext = scanner.next(actual);
2938    assertEquals(false, hasNext);
2939
2940    // Verify result
2941    for (int i = 0; i < expected.size(); i++) {
2942      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2943    }
2944  }
2945
2946  @Test
2947  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
2948      IOException {
2949    byte[] row1 = Bytes.toBytes("row1");
2950    byte[] fam1 = Bytes.toBytes("fam1");
2951    byte[][] families = { fam1 };
2952    byte[] qf1 = Bytes.toBytes("qualifier1");
2953    byte[] qf2 = Bytes.toBytes("qualifier2");
2954
2955    long ts1 = 1;
2956    long ts2 = ts1 + 1;
2957    long ts3 = ts1 + 2;
2958    long ts4 = ts1 + 3;
2959
2960    // Setting up region
2961    this.region = initHRegion(tableName, method, CONF, families);
2962    // Putting data in Region
2963    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
2964    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2965    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2966    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2967
2968    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
2969    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2970    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2971    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2972
2973    Put put = null;
2974    put = new Put(row1);
2975    put.add(kv14);
2976    put.add(kv24);
2977    region.put(put);
2978    region.flush(true);
2979
2980    put = new Put(row1);
2981    put.add(kv23);
2982    put.add(kv13);
2983    region.put(put);
2984    region.flush(true);
2985
2986    put = new Put(row1);
2987    put.add(kv22);
2988    put.add(kv12);
2989    region.put(put);
2990    region.flush(true);
2991
2992    put = new Put(row1);
2993    put.add(kv21);
2994    put.add(kv11);
2995    region.put(put);
2996
2997    // Expected
2998    List<Cell> expected = new ArrayList<>();
2999    expected.add(kv14);
3000    expected.add(kv13);
3001    expected.add(kv12);
3002    expected.add(kv24);
3003    expected.add(kv23);
3004    expected.add(kv22);
3005
3006    Scan scan = new Scan(row1);
3007    scan.addColumn(fam1, qf1);
3008    scan.addColumn(fam1, qf2);
3009    int versions = 3;
3010    scan.setMaxVersions(versions);
3011    List<Cell> actual = new ArrayList<>();
3012    InternalScanner scanner = region.getScanner(scan);
3013
3014    boolean hasNext = scanner.next(actual);
3015    assertEquals(false, hasNext);
3016
3017    // Verify result
3018    for (int i = 0; i < expected.size(); i++) {
3019      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3020    }
3021  }
3022
3023  @Test
3024  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3025    byte[] row1 = Bytes.toBytes("row1");
3026    byte[] qf1 = Bytes.toBytes("qualifier1");
3027    byte[] qf2 = Bytes.toBytes("qualifier2");
3028    byte[] fam1 = Bytes.toBytes("fam1");
3029    byte[][] families = { fam1 };
3030
3031    long ts1 = System.currentTimeMillis();
3032    long ts2 = ts1 + 1;
3033    long ts3 = ts1 + 2;
3034
3035    // Setting up region
3036    this.region = initHRegion(tableName, method, CONF, families);
3037    // Putting data in Region
3038    Put put = null;
3039    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3040    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3041    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3042
3043    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3044    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3045    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3046
3047    put = new Put(row1);
3048    put.add(kv13);
3049    put.add(kv12);
3050    put.add(kv11);
3051    put.add(kv23);
3052    put.add(kv22);
3053    put.add(kv21);
3054    region.put(put);
3055
3056    // Expected
3057    List<Cell> expected = new ArrayList<>();
3058    expected.add(kv13);
3059    expected.add(kv12);
3060    expected.add(kv23);
3061    expected.add(kv22);
3062
3063    Scan scan = new Scan(row1);
3064    scan.addFamily(fam1);
3065    scan.setMaxVersions(MAX_VERSIONS);
3066    List<Cell> actual = new ArrayList<>();
3067    InternalScanner scanner = region.getScanner(scan);
3068
3069    boolean hasNext = scanner.next(actual);
3070    assertEquals(false, hasNext);
3071
3072    // Verify result
3073    for (int i = 0; i < expected.size(); i++) {
3074      assertEquals(expected.get(i), actual.get(i));
3075    }
3076  }
3077
3078  @Test
3079  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3080    byte[] row1 = Bytes.toBytes("row1");
3081    byte[] qf1 = Bytes.toBytes("qualifier1");
3082    byte[] qf2 = Bytes.toBytes("qualifier2");
3083    byte[] fam1 = Bytes.toBytes("fam1");
3084
3085    long ts1 = 1; // System.currentTimeMillis();
3086    long ts2 = ts1 + 1;
3087    long ts3 = ts1 + 2;
3088
3089    // Setting up region
3090    this.region = initHRegion(tableName, method, CONF, fam1);
3091    // Putting data in Region
3092    Put put = null;
3093    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3094    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3095    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3096
3097    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3098    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3099    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3100
3101    put = new Put(row1);
3102    put.add(kv13);
3103    put.add(kv12);
3104    put.add(kv11);
3105    put.add(kv23);
3106    put.add(kv22);
3107    put.add(kv21);
3108    region.put(put);
3109    region.flush(true);
3110
3111    // Expected
3112    List<Cell> expected = new ArrayList<>();
3113    expected.add(kv13);
3114    expected.add(kv12);
3115    expected.add(kv23);
3116    expected.add(kv22);
3117
3118    Scan scan = new Scan(row1);
3119    scan.addFamily(fam1);
3120    scan.setMaxVersions(MAX_VERSIONS);
3121    List<Cell> actual = new ArrayList<>();
3122    InternalScanner scanner = region.getScanner(scan);
3123
3124    boolean hasNext = scanner.next(actual);
3125    assertEquals(false, hasNext);
3126
3127    // Verify result
3128    for (int i = 0; i < expected.size(); i++) {
3129      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3130    }
3131  }
3132
3133  @Test
3134  public void testScanner_StopRow1542() throws IOException {
3135    byte[] family = Bytes.toBytes("testFamily");
3136    this.region = initHRegion(tableName, method, CONF, family);
3137    byte[] row1 = Bytes.toBytes("row111");
3138    byte[] row2 = Bytes.toBytes("row222");
3139    byte[] row3 = Bytes.toBytes("row333");
3140    byte[] row4 = Bytes.toBytes("row444");
3141    byte[] row5 = Bytes.toBytes("row555");
3142
3143    byte[] col1 = Bytes.toBytes("Pub111");
3144    byte[] col2 = Bytes.toBytes("Pub222");
3145
3146    Put put = new Put(row1);
3147    put.addColumn(family, col1, Bytes.toBytes(10L));
3148    region.put(put);
3149
3150    put = new Put(row2);
3151    put.addColumn(family, col1, Bytes.toBytes(15L));
3152    region.put(put);
3153
3154    put = new Put(row3);
3155    put.addColumn(family, col2, Bytes.toBytes(20L));
3156    region.put(put);
3157
3158    put = new Put(row4);
3159    put.addColumn(family, col2, Bytes.toBytes(30L));
3160    region.put(put);
3161
3162    put = new Put(row5);
3163    put.addColumn(family, col1, Bytes.toBytes(40L));
3164    region.put(put);
3165
3166    Scan scan = new Scan(row3, row4);
3167    scan.setMaxVersions();
3168    scan.addColumn(family, col1);
3169    InternalScanner s = region.getScanner(scan);
3170
3171    List<Cell> results = new ArrayList<>();
3172    assertEquals(false, s.next(results));
3173    assertEquals(0, results.size());
3174  }
3175
3176  @Test
3177  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3178    byte[] row1 = Bytes.toBytes("row1");
3179    byte[] fam1 = Bytes.toBytes("fam1");
3180    byte[] qf1 = Bytes.toBytes("qualifier1");
3181    byte[] qf2 = Bytes.toBytes("quateslifier2");
3182
3183    long ts1 = 1;
3184    long ts2 = ts1 + 1;
3185    long ts3 = ts1 + 2;
3186    long ts4 = ts1 + 3;
3187
3188    // Setting up region
3189    this.region = initHRegion(tableName, method, CONF, fam1);
3190    // Putting data in Region
3191    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3192    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3193    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3194    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3195
3196    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3197    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3198    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3199    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3200
3201    Put put = null;
3202    put = new Put(row1);
3203    put.add(kv14);
3204    put.add(kv24);
3205    region.put(put);
3206    region.flush(true);
3207
3208    put = new Put(row1);
3209    put.add(kv23);
3210    put.add(kv13);
3211    region.put(put);
3212    region.flush(true);
3213
3214    put = new Put(row1);
3215    put.add(kv22);
3216    put.add(kv12);
3217    region.put(put);
3218    region.flush(true);
3219
3220    put = new Put(row1);
3221    put.add(kv21);
3222    put.add(kv11);
3223    region.put(put);
3224
3225    // Expected
3226    List<KeyValue> expected = new ArrayList<>();
3227    expected.add(kv14);
3228    expected.add(kv13);
3229    expected.add(kv12);
3230    expected.add(kv24);
3231    expected.add(kv23);
3232    expected.add(kv22);
3233
3234    Scan scan = new Scan(row1);
3235    int versions = 3;
3236    scan.setMaxVersions(versions);
3237    List<Cell> actual = new ArrayList<>();
3238    InternalScanner scanner = region.getScanner(scan);
3239
3240    boolean hasNext = scanner.next(actual);
3241    assertEquals(false, hasNext);
3242
3243    // Verify result
3244    for (int i = 0; i < expected.size(); i++) {
3245      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3246    }
3247  }
3248
3249  /**
3250   * Added for HBASE-5416
3251   *
3252   * Here we test scan optimization when only subset of CFs are used in filter
3253   * conditions.
3254   */
3255  @Test
3256  public void testScanner_JoinedScanners() throws IOException {
3257    byte[] cf_essential = Bytes.toBytes("essential");
3258    byte[] cf_joined = Bytes.toBytes("joined");
3259    byte[] cf_alpha = Bytes.toBytes("alpha");
3260    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3261    byte[] row1 = Bytes.toBytes("row1");
3262    byte[] row2 = Bytes.toBytes("row2");
3263    byte[] row3 = Bytes.toBytes("row3");
3264
3265    byte[] col_normal = Bytes.toBytes("d");
3266    byte[] col_alpha = Bytes.toBytes("a");
3267
3268    byte[] filtered_val = Bytes.toBytes(3);
3269
3270    Put put = new Put(row1);
3271    put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3272    put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3273    region.put(put);
3274
3275    put = new Put(row2);
3276    put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3277    put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3278    put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3279    region.put(put);
3280
3281    put = new Put(row3);
3282    put.addColumn(cf_essential, col_normal, filtered_val);
3283    put.addColumn(cf_joined, col_normal, filtered_val);
3284    region.put(put);
3285
3286    // Check two things:
3287    // 1. result list contains expected values
3288    // 2. result list is sorted properly
3289
3290    Scan scan = new Scan();
3291    Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3292            CompareOperator.NOT_EQUAL, filtered_val);
3293    scan.setFilter(filter);
3294    scan.setLoadColumnFamiliesOnDemand(true);
3295    InternalScanner s = region.getScanner(scan);
3296
3297    List<Cell> results = new ArrayList<>();
3298    assertTrue(s.next(results));
3299    assertEquals(1, results.size());
3300    results.clear();
3301
3302    assertTrue(s.next(results));
3303    assertEquals(3, results.size());
3304    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3305    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3306    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3307    results.clear();
3308
3309    assertFalse(s.next(results));
3310    assertEquals(0, results.size());
3311  }
3312
3313  /**
3314   * HBASE-5416
3315   *
3316   * Test case when scan limits amount of KVs returned on each next() call.
3317   */
3318  @Test
3319  public void testScanner_JoinedScannersWithLimits() throws IOException {
3320    final byte[] cf_first = Bytes.toBytes("first");
3321    final byte[] cf_second = Bytes.toBytes("second");
3322
3323    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3324    final byte[] col_a = Bytes.toBytes("a");
3325    final byte[] col_b = Bytes.toBytes("b");
3326
3327    Put put;
3328
3329    for (int i = 0; i < 10; i++) {
3330      put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3331      put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3332      if (i < 5) {
3333        put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3334        put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3335        put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3336      }
3337      region.put(put);
3338    }
3339
3340    Scan scan = new Scan();
3341    scan.setLoadColumnFamiliesOnDemand(true);
3342    Filter bogusFilter = new FilterBase() {
3343      @Override
3344      public ReturnCode filterCell(final Cell ignored) throws IOException {
3345        return ReturnCode.INCLUDE;
3346      }
3347      @Override
3348      public boolean isFamilyEssential(byte[] name) {
3349        return Bytes.equals(name, cf_first);
3350      }
3351    };
3352
3353    scan.setFilter(bogusFilter);
3354    InternalScanner s = region.getScanner(scan);
3355
3356    // Our data looks like this:
3357    // r0: first:a, first:b, second:a, second:b
3358    // r1: first:a, first:b, second:a, second:b
3359    // r2: first:a, first:b, second:a, second:b
3360    // r3: first:a, first:b, second:a, second:b
3361    // r4: first:a, first:b, second:a, second:b
3362    // r5: first:a
3363    // r6: first:a
3364    // r7: first:a
3365    // r8: first:a
3366    // r9: first:a
3367
3368    // But due to next's limit set to 3, we should get this:
3369    // r0: first:a, first:b, second:a
3370    // r0: second:b
3371    // r1: first:a, first:b, second:a
3372    // r1: second:b
3373    // r2: first:a, first:b, second:a
3374    // r2: second:b
3375    // r3: first:a, first:b, second:a
3376    // r3: second:b
3377    // r4: first:a, first:b, second:a
3378    // r4: second:b
3379    // r5: first:a
3380    // r6: first:a
3381    // r7: first:a
3382    // r8: first:a
3383    // r9: first:a
3384
3385    List<Cell> results = new ArrayList<>();
3386    int index = 0;
3387    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3388    while (true) {
3389      boolean more = s.next(results, scannerContext);
3390      if ((index >> 1) < 5) {
3391        if (index % 2 == 0) {
3392          assertEquals(3, results.size());
3393        } else {
3394          assertEquals(1, results.size());
3395        }
3396      } else {
3397        assertEquals(1, results.size());
3398      }
3399      results.clear();
3400      index++;
3401      if (!more) {
3402        break;
3403      }
3404    }
3405  }
3406
3407  /**
3408   * Write an HFile block full with Cells whose qualifier that are identical between
3409   * 0 and Short.MAX_VALUE. See HBASE-13329.
3410   * @throws Exception
3411   */
3412  @Test
3413  public void testLongQualifier() throws Exception {
3414    byte[] family = Bytes.toBytes("family");
3415    this.region = initHRegion(tableName, method, CONF, family);
3416    byte[] q = new byte[Short.MAX_VALUE+2];
3417    Arrays.fill(q, 0, q.length-1, (byte)42);
3418    for (byte i=0; i<10; i++) {
3419      Put p = new Put(Bytes.toBytes("row"));
3420      // qualifiers that differ past Short.MAX_VALUE
3421      q[q.length-1]=i;
3422      p.addColumn(family, q, q);
3423      region.put(p);
3424    }
3425    region.flush(false);
3426  }
3427
3428  /**
3429   * Flushes the cache in a thread while scanning. The tests verify that the
3430   * scan is coherent - e.g. the returned results are always of the same or
3431   * later update as the previous results.
3432   *
3433   * @throws IOException
3434   *           scan / compact
3435   * @throws InterruptedException
3436   *           thread join
3437   */
3438  @Test
3439  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3440    byte[] family = Bytes.toBytes("family");
3441    int numRows = 1000;
3442    int flushAndScanInterval = 10;
3443    int compactInterval = 10 * flushAndScanInterval;
3444
3445    this.region = initHRegion(tableName, method, CONF, family);
3446    FlushThread flushThread = new FlushThread();
3447    try {
3448      flushThread.start();
3449
3450      Scan scan = new Scan();
3451      scan.addFamily(family);
3452      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOperator.EQUAL,
3453          new BinaryComparator(Bytes.toBytes(5L))));
3454
3455      int expectedCount = 0;
3456      List<Cell> res = new ArrayList<>();
3457
3458      boolean toggle = true;
3459      for (long i = 0; i < numRows; i++) {
3460        Put put = new Put(Bytes.toBytes(i));
3461        put.setDurability(Durability.SKIP_WAL);
3462        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3463        region.put(put);
3464
3465        if (i != 0 && i % compactInterval == 0) {
3466          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3467          region.compact(true);
3468        }
3469
3470        if (i % 10 == 5L) {
3471          expectedCount++;
3472        }
3473
3474        if (i != 0 && i % flushAndScanInterval == 0) {
3475          res.clear();
3476          InternalScanner scanner = region.getScanner(scan);
3477          if (toggle) {
3478            flushThread.flush();
3479          }
3480          while (scanner.next(res))
3481            ;
3482          if (!toggle) {
3483            flushThread.flush();
3484          }
3485          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3486              expectedCount, res.size());
3487          toggle = !toggle;
3488        }
3489      }
3490
3491    } finally {
3492      try {
3493        flushThread.done();
3494        flushThread.join();
3495        flushThread.checkNoError();
3496      } catch (InterruptedException ie) {
3497        LOG.warn("Caught exception when joining with flushThread", ie);
3498      }
3499      HBaseTestingUtility.closeRegionAndWAL(this.region);
3500      this.region = null;
3501    }
3502  }
3503
3504  protected class FlushThread extends Thread {
3505    private volatile boolean done;
3506    private Throwable error = null;
3507
3508    FlushThread() {
3509      super("FlushThread");
3510    }
3511
3512    public void done() {
3513      done = true;
3514      synchronized (this) {
3515        interrupt();
3516      }
3517    }
3518
3519    public void checkNoError() {
3520      if (error != null) {
3521        assertNull(error);
3522      }
3523    }
3524
3525    @Override
3526    public void run() {
3527      done = false;
3528      while (!done) {
3529        synchronized (this) {
3530          try {
3531            wait();
3532          } catch (InterruptedException ignored) {
3533            if (done) {
3534              break;
3535            }
3536          }
3537        }
3538        try {
3539          region.flush(true);
3540        } catch (IOException e) {
3541          if (!done) {
3542            LOG.error("Error while flushing cache", e);
3543            error = e;
3544          }
3545          break;
3546        } catch (Throwable t) {
3547          LOG.error("Uncaught exception", t);
3548          throw t;
3549        }
3550      }
3551    }
3552
3553    public void flush() {
3554      synchronized (this) {
3555        notify();
3556      }
3557    }
3558  }
3559
3560  /**
3561   * Writes very wide records and scans for the latest every time.. Flushes and
3562   * compacts the region every now and then to keep things realistic.
3563   *
3564   * @throws IOException
3565   *           by flush / scan / compaction
3566   * @throws InterruptedException
3567   *           when joining threads
3568   */
3569  @Test
3570  public void testWritesWhileScanning() throws IOException, InterruptedException {
3571    int testCount = 100;
3572    int numRows = 1;
3573    int numFamilies = 10;
3574    int numQualifiers = 100;
3575    int flushInterval = 7;
3576    int compactInterval = 5 * flushInterval;
3577    byte[][] families = new byte[numFamilies][];
3578    for (int i = 0; i < numFamilies; i++) {
3579      families[i] = Bytes.toBytes("family" + i);
3580    }
3581    byte[][] qualifiers = new byte[numQualifiers][];
3582    for (int i = 0; i < numQualifiers; i++) {
3583      qualifiers[i] = Bytes.toBytes("qual" + i);
3584    }
3585
3586    this.region = initHRegion(tableName, method, CONF, families);
3587    FlushThread flushThread = new FlushThread();
3588    PutThread putThread = new PutThread(numRows, families, qualifiers);
3589    try {
3590      putThread.start();
3591      putThread.waitForFirstPut();
3592
3593      flushThread.start();
3594
3595      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3596
3597      int expectedCount = numFamilies * numQualifiers;
3598      List<Cell> res = new ArrayList<>();
3599
3600      long prevTimestamp = 0L;
3601      for (int i = 0; i < testCount; i++) {
3602
3603        if (i != 0 && i % compactInterval == 0) {
3604          region.compact(true);
3605          for (HStore store : region.getStores()) {
3606            store.closeAndArchiveCompactedFiles();
3607          }
3608        }
3609
3610        if (i != 0 && i % flushInterval == 0) {
3611          flushThread.flush();
3612        }
3613
3614        boolean previousEmpty = res.isEmpty();
3615        res.clear();
3616        try (InternalScanner scanner = region.getScanner(scan)) {
3617          boolean moreRows;
3618          do {
3619            moreRows = scanner.next(res);
3620          } while (moreRows);
3621        }
3622        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3623          assertEquals("i=" + i, expectedCount, res.size());
3624          long timestamp = res.get(0).getTimestamp();
3625          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3626              timestamp >= prevTimestamp);
3627          prevTimestamp = timestamp;
3628        }
3629      }
3630
3631      putThread.done();
3632
3633      region.flush(true);
3634
3635    } finally {
3636      try {
3637        flushThread.done();
3638        flushThread.join();
3639        flushThread.checkNoError();
3640
3641        putThread.join();
3642        putThread.checkNoError();
3643      } catch (InterruptedException ie) {
3644        LOG.warn("Caught exception when joining with flushThread", ie);
3645      }
3646
3647      try {
3648          HBaseTestingUtility.closeRegionAndWAL(this.region);
3649      } catch (DroppedSnapshotException dse) {
3650        // We could get this on way out because we interrupt the background flusher and it could
3651        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3652        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3653        // flush because the accounting is 'off' since original DSE.
3654      }
3655      this.region = null;
3656    }
3657  }
3658
3659  protected class PutThread extends Thread {
3660    private volatile boolean done;
3661    private volatile int numPutsFinished = 0;
3662
3663    private Throwable error = null;
3664    private int numRows;
3665    private byte[][] families;
3666    private byte[][] qualifiers;
3667
3668    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3669      super("PutThread");
3670      this.numRows = numRows;
3671      this.families = families;
3672      this.qualifiers = qualifiers;
3673    }
3674
3675    /**
3676     * Block calling thread until this instance of PutThread has put at least one row.
3677     */
3678    public void waitForFirstPut() throws InterruptedException {
3679      // wait until put thread actually puts some data
3680      while (isAlive() && numPutsFinished == 0) {
3681        checkNoError();
3682        Thread.sleep(50);
3683      }
3684    }
3685
3686    public void done() {
3687      done = true;
3688      synchronized (this) {
3689        interrupt();
3690      }
3691    }
3692
3693    public void checkNoError() {
3694      if (error != null) {
3695        assertNull(error);
3696      }
3697    }
3698
3699    @Override
3700    public void run() {
3701      done = false;
3702      while (!done) {
3703        try {
3704          for (int r = 0; r < numRows; r++) {
3705            byte[] row = Bytes.toBytes("row" + r);
3706            Put put = new Put(row);
3707            put.setDurability(Durability.SKIP_WAL);
3708            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3709            for (byte[] family : families) {
3710              for (byte[] qualifier : qualifiers) {
3711                put.addColumn(family, qualifier, numPutsFinished, value);
3712              }
3713            }
3714            region.put(put);
3715            numPutsFinished++;
3716            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3717              LOG.debug("put iteration = {}", numPutsFinished);
3718              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3719              region.delete(delete);
3720            }
3721            numPutsFinished++;
3722          }
3723        } catch (InterruptedIOException e) {
3724          // This is fine. It means we are done, or didn't get the lock on time
3725          LOG.info("Interrupted", e);
3726        } catch (IOException e) {
3727          LOG.error("Error while putting records", e);
3728          error = e;
3729          break;
3730        }
3731      }
3732
3733    }
3734
3735  }
3736
3737  /**
3738   * Writes very wide records and gets the latest row every time.. Flushes and
3739   * compacts the region aggressivly to catch issues.
3740   *
3741   * @throws IOException
3742   *           by flush / scan / compaction
3743   * @throws InterruptedException
3744   *           when joining threads
3745   */
3746  @Test
3747  public void testWritesWhileGetting() throws Exception {
3748    int testCount = 50;
3749    int numRows = 1;
3750    int numFamilies = 10;
3751    int numQualifiers = 100;
3752    int compactInterval = 100;
3753    byte[][] families = new byte[numFamilies][];
3754    for (int i = 0; i < numFamilies; i++) {
3755      families[i] = Bytes.toBytes("family" + i);
3756    }
3757    byte[][] qualifiers = new byte[numQualifiers][];
3758    for (int i = 0; i < numQualifiers; i++) {
3759      qualifiers[i] = Bytes.toBytes("qual" + i);
3760    }
3761
3762
3763    // This test flushes constantly and can cause many files to be created,
3764    // possibly
3765    // extending over the ulimit. Make sure compactions are aggressive in
3766    // reducing
3767    // the number of HFiles created.
3768    Configuration conf = HBaseConfiguration.create(CONF);
3769    conf.setInt("hbase.hstore.compaction.min", 1);
3770    conf.setInt("hbase.hstore.compaction.max", 1000);
3771    this.region = initHRegion(tableName, method, conf, families);
3772    PutThread putThread = null;
3773    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3774    try {
3775      putThread = new PutThread(numRows, families, qualifiers);
3776      putThread.start();
3777      putThread.waitForFirstPut();
3778
3779      // Add a thread that flushes as fast as possible
3780      ctx.addThread(new RepeatingTestThread(ctx) {
3781
3782        @Override
3783        public void doAnAction() throws Exception {
3784          region.flush(true);
3785          // Compact regularly to avoid creating too many files and exceeding
3786          // the ulimit.
3787          region.compact(false);
3788          for (HStore store : region.getStores()) {
3789            store.closeAndArchiveCompactedFiles();
3790          }
3791        }
3792      });
3793      ctx.startThreads();
3794
3795      Get get = new Get(Bytes.toBytes("row0"));
3796      Result result = null;
3797
3798      int expectedCount = numFamilies * numQualifiers;
3799
3800      long prevTimestamp = 0L;
3801      for (int i = 0; i < testCount; i++) {
3802        LOG.info("testWritesWhileGetting verify turn " + i);
3803        boolean previousEmpty = result == null || result.isEmpty();
3804        result = region.get(get);
3805        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
3806          assertEquals("i=" + i, expectedCount, result.size());
3807          // TODO this was removed, now what dangit?!
3808          // search looking for the qualifier in question?
3809          long timestamp = 0;
3810          for (Cell kv : result.rawCells()) {
3811            if (CellUtil.matchingFamily(kv, families[0])
3812                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
3813              timestamp = kv.getTimestamp();
3814            }
3815          }
3816          assertTrue(timestamp >= prevTimestamp);
3817          prevTimestamp = timestamp;
3818          Cell previousKV = null;
3819
3820          for (Cell kv : result.rawCells()) {
3821            byte[] thisValue = CellUtil.cloneValue(kv);
3822            if (previousKV != null) {
3823              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
3824                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
3825                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
3826                    + "(memStoreTS:" + kv.getSequenceId() + ")");
3827                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
3828              }
3829            }
3830            previousKV = kv;
3831          }
3832        }
3833      }
3834    } finally {
3835      if (putThread != null)
3836        putThread.done();
3837
3838      region.flush(true);
3839
3840      if (putThread != null) {
3841        putThread.join();
3842        putThread.checkNoError();
3843      }
3844
3845      ctx.stop();
3846      HBaseTestingUtility.closeRegionAndWAL(this.region);
3847      this.region = null;
3848    }
3849  }
3850
3851  @Test
3852  public void testHolesInMeta() throws Exception {
3853    byte[] family = Bytes.toBytes("family");
3854    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
3855        false, family);
3856    byte[] rowNotServed = Bytes.toBytes("a");
3857    Get g = new Get(rowNotServed);
3858    try {
3859      region.get(g);
3860      fail();
3861    } catch (WrongRegionException x) {
3862      // OK
3863    }
3864    byte[] row = Bytes.toBytes("y");
3865    g = new Get(row);
3866    region.get(g);
3867  }
3868
3869  @Test
3870  public void testIndexesScanWithOneDeletedRow() throws IOException {
3871    byte[] family = Bytes.toBytes("family");
3872
3873    // Setting up region
3874    this.region = initHRegion(tableName, method, CONF, family);
3875    Put put = new Put(Bytes.toBytes(1L));
3876    put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
3877    region.put(put);
3878
3879    region.flush(true);
3880
3881    Delete delete = new Delete(Bytes.toBytes(1L), 1L);
3882    region.delete(delete);
3883
3884    put = new Put(Bytes.toBytes(2L));
3885    put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
3886    region.put(put);
3887
3888    Scan idxScan = new Scan();
3889    idxScan.addFamily(family);
3890    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
3891        new SingleColumnValueFilter(family, qual1, CompareOperator.GREATER_OR_EQUAL,
3892            new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
3893                    CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
3894    InternalScanner scanner = region.getScanner(idxScan);
3895    List<Cell> res = new ArrayList<>();
3896
3897    while (scanner.next(res)) {
3898      // Ignore res value.
3899    }
3900    assertEquals(1L, res.size());
3901  }
3902
3903  // ////////////////////////////////////////////////////////////////////////////
3904  // Bloom filter test
3905  // ////////////////////////////////////////////////////////////////////////////
3906  @Test
3907  public void testBloomFilterSize() throws IOException {
3908    byte[] fam1 = Bytes.toBytes("fam1");
3909    byte[] qf1 = Bytes.toBytes("col");
3910    byte[] val1 = Bytes.toBytes("value1");
3911    // Create Table
3912    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
3913        .setBloomFilterType(BloomType.ROWCOL);
3914
3915    HTableDescriptor htd = new HTableDescriptor(tableName);
3916    htd.addFamily(hcd);
3917    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3918    this.region = TEST_UTIL.createLocalHRegion(info, htd);
3919    int num_unique_rows = 10;
3920    int duplicate_multiplier = 2;
3921    int num_storefiles = 4;
3922
3923    int version = 0;
3924    for (int f = 0; f < num_storefiles; f++) {
3925      for (int i = 0; i < duplicate_multiplier; i++) {
3926        for (int j = 0; j < num_unique_rows; j++) {
3927          Put put = new Put(Bytes.toBytes("row" + j));
3928          put.setDurability(Durability.SKIP_WAL);
3929          long ts = version++;
3930          put.addColumn(fam1, qf1, ts, val1);
3931          region.put(put);
3932        }
3933      }
3934      region.flush(true);
3935    }
3936    // before compaction
3937    HStore store = region.getStore(fam1);
3938    Collection<HStoreFile> storeFiles = store.getStorefiles();
3939    for (HStoreFile storefile : storeFiles) {
3940      StoreFileReader reader = storefile.getReader();
3941      reader.loadFileInfo();
3942      reader.loadBloomfilter();
3943      assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
3944      assertEquals(num_unique_rows, reader.getFilterEntries());
3945    }
3946
3947    region.compact(true);
3948
3949    // after compaction
3950    storeFiles = store.getStorefiles();
3951    for (HStoreFile storefile : storeFiles) {
3952      StoreFileReader reader = storefile.getReader();
3953      reader.loadFileInfo();
3954      reader.loadBloomfilter();
3955      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
3956      assertEquals(num_unique_rows, reader.getFilterEntries());
3957    }
3958  }
3959
3960  @Test
3961  public void testAllColumnsWithBloomFilter() throws IOException {
3962    byte[] TABLE = Bytes.toBytes(name.getMethodName());
3963    byte[] FAMILY = Bytes.toBytes("family");
3964
3965    // Create table
3966    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
3967        .setBloomFilterType(BloomType.ROWCOL);
3968    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
3969    htd.addFamily(hcd);
3970    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3971    this.region = TEST_UTIL.createLocalHRegion(info, htd);
3972    // For row:0, col:0: insert versions 1 through 5.
3973    byte[] row = Bytes.toBytes("row:" + 0);
3974    byte[] column = Bytes.toBytes("column:" + 0);
3975    Put put = new Put(row);
3976    put.setDurability(Durability.SKIP_WAL);
3977    for (long idx = 1; idx <= 4; idx++) {
3978      put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
3979    }
3980    region.put(put);
3981
3982    // Flush
3983    region.flush(true);
3984
3985    // Get rows
3986    Get get = new Get(row);
3987    get.readAllVersions();
3988    Cell[] kvs = region.get(get).rawCells();
3989
3990    // Check if rows are correct
3991    assertEquals(4, kvs.length);
3992    checkOneCell(kvs[0], FAMILY, 0, 0, 4);
3993    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
3994    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
3995    checkOneCell(kvs[3], FAMILY, 0, 0, 1);
3996  }
3997
3998  /**
3999   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4000   * issuing delete row on columns with bloom filter set to row+col
4001   * (BloomType.ROWCOL)
4002   */
4003  @Test
4004  public void testDeleteRowWithBloomFilter() throws IOException {
4005    byte[] familyName = Bytes.toBytes("familyName");
4006
4007    // Create Table
4008    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4009        .setBloomFilterType(BloomType.ROWCOL);
4010
4011    HTableDescriptor htd = new HTableDescriptor(tableName);
4012    htd.addFamily(hcd);
4013    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4014    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4015    // Insert some data
4016    byte[] row = Bytes.toBytes("row1");
4017    byte[] col = Bytes.toBytes("col1");
4018
4019    Put put = new Put(row);
4020    put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4021    region.put(put);
4022    region.flush(true);
4023
4024    Delete del = new Delete(row);
4025    region.delete(del);
4026    region.flush(true);
4027
4028    // Get remaining rows (should have none)
4029    Get get = new Get(row);
4030    get.addColumn(familyName, col);
4031
4032    Cell[] keyValues = region.get(get).rawCells();
4033    assertEquals(0, keyValues.length);
4034  }
4035
4036  @Test
4037  public void testgetHDFSBlocksDistribution() throws Exception {
4038    HBaseTestingUtility htu = new HBaseTestingUtility();
4039    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4040    // break up the file in to more pieces that can be distributed across the three nodes and we
4041    // won't be able to have the condition this test asserts; that at least one node has
4042    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4043    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4044    // final int DEFAULT_BLOCK_SIZE = 1024;
4045    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4046    htu.getConfiguration().setInt("dfs.replication", 2);
4047
4048    // set up a cluster with 3 nodes
4049    MiniHBaseCluster cluster = null;
4050    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4051    int regionServersCount = 3;
4052
4053    try {
4054      StartMiniClusterOption option = StartMiniClusterOption.builder()
4055          .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build();
4056      cluster = htu.startMiniCluster(option);
4057      byte[][] families = { fam1, fam2 };
4058      Table ht = htu.createTable(tableName, families);
4059
4060      // Setting up region
4061      byte row[] = Bytes.toBytes("row1");
4062      byte col[] = Bytes.toBytes("col1");
4063
4064      Put put = new Put(row);
4065      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4066      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4067      ht.put(put);
4068
4069      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4070      firstRegion.flush(true);
4071      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4072
4073      // Given the default replication factor is 2 and we have 2 HFiles,
4074      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4075      // must be at least one host that have replica for 2 HFiles. That host's
4076      // weight will be equal to the unique block weight.
4077      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4078      StringBuilder sb = new StringBuilder();
4079      for (String host: blocksDistribution1.getTopHosts()) {
4080        if (sb.length() > 0) sb.append(", ");
4081        sb.append(host);
4082        sb.append("=");
4083        sb.append(blocksDistribution1.getWeight(host));
4084      }
4085
4086      String topHost = blocksDistribution1.getTopHosts().get(0);
4087      long topHostWeight = blocksDistribution1.getWeight(topHost);
4088      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4089        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4090      LOG.info(msg);
4091      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4092
4093      // use the static method to compute the value, it should be the same.
4094      // static method is used by load balancer or other components
4095      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4096          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4097      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4098
4099      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4100
4101      ht.close();
4102    } finally {
4103      if (cluster != null) {
4104        htu.shutdownMiniCluster();
4105      }
4106    }
4107  }
4108
4109  /**
4110   * Testcase to check state of region initialization task set to ABORTED or not
4111   * if any exceptions during initialization
4112   *
4113   * @throws Exception
4114   */
4115  @Test
4116  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4117    HRegionInfo info;
4118    try {
4119      FileSystem fs = Mockito.mock(FileSystem.class);
4120      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4121      HTableDescriptor htd = new HTableDescriptor(tableName);
4122      htd.addFamily(new HColumnDescriptor("cf"));
4123      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4124          HConstants.EMPTY_BYTE_ARRAY, false);
4125      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4126      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4127      // region initialization throws IOException and set task state to ABORTED.
4128      region.initialize();
4129      fail("Region initialization should fail due to IOException");
4130    } catch (IOException io) {
4131      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4132      for (MonitoredTask monitoredTask : tasks) {
4133        if (!(monitoredTask instanceof MonitoredRPCHandler)
4134            && monitoredTask.getDescription().contains(region.toString())) {
4135          assertTrue("Region state should be ABORTED.",
4136              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4137          break;
4138        }
4139      }
4140    }
4141  }
4142
4143  /**
4144   * Verifies that the .regioninfo file is written on region creation and that
4145   * is recreated if missing during region opening.
4146   */
4147  @Test
4148  public void testRegionInfoFileCreation() throws IOException {
4149    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4150
4151    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4152    htd.addFamily(new HColumnDescriptor("cf"));
4153
4154    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4155
4156    // Create a region and skip the initialization (like CreateTableHandler)
4157    region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4158    Path regionDir = region.getRegionFileSystem().getRegionDir();
4159    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4160    HBaseTestingUtility.closeRegionAndWAL(region);
4161
4162    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4163
4164    // Verify that the .regioninfo file is present
4165    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4166        fs.exists(regionInfoFile));
4167
4168    // Try to open the region
4169    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4170    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4171    HBaseTestingUtility.closeRegionAndWAL(region);
4172
4173    // Verify that the .regioninfo file is still there
4174    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4175        fs.exists(regionInfoFile));
4176
4177    // Remove the .regioninfo file and verify is recreated on region open
4178    fs.delete(regionInfoFile, true);
4179    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4180        fs.exists(regionInfoFile));
4181
4182    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4183//    region = TEST_UTIL.openHRegion(hri, htd);
4184    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4185    HBaseTestingUtility.closeRegionAndWAL(region);
4186
4187    // Verify that the .regioninfo file is still there
4188    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4189        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4190
4191    region = null;
4192  }
4193
4194  /**
4195   * TestCase for increment
4196   */
4197  private static class Incrementer implements Runnable {
4198    private HRegion region;
4199    private final static byte[] incRow = Bytes.toBytes("incRow");
4200    private final static byte[] family = Bytes.toBytes("family");
4201    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4202    private final static long ONE = 1L;
4203    private int incCounter;
4204
4205    public Incrementer(HRegion region, int incCounter) {
4206      this.region = region;
4207      this.incCounter = incCounter;
4208    }
4209
4210    @Override
4211    public void run() {
4212      int count = 0;
4213      while (count < incCounter) {
4214        Increment inc = new Increment(incRow);
4215        inc.addColumn(family, qualifier, ONE);
4216        count++;
4217        try {
4218          region.increment(inc);
4219        } catch (IOException e) {
4220          LOG.info("Count=" + count + ", " + e);
4221          break;
4222        }
4223      }
4224    }
4225  }
4226
4227  /**
4228   * Test case to check increment function with memstore flushing
4229   * @throws Exception
4230   */
4231  @Test
4232  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4233    byte[] family = Incrementer.family;
4234    this.region = initHRegion(tableName, method, CONF, family);
4235    final HRegion region = this.region;
4236    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4237    Runnable flusher = new Runnable() {
4238      @Override
4239      public void run() {
4240        while (!incrementDone.get()) {
4241          try {
4242            region.flush(true);
4243          } catch (Exception e) {
4244            e.printStackTrace();
4245          }
4246        }
4247      }
4248    };
4249
4250    // after all increment finished, the row will increment to 20*100 = 2000
4251    int threadNum = 20;
4252    int incCounter = 100;
4253    long expected = (long) threadNum * incCounter;
4254    Thread[] incrementers = new Thread[threadNum];
4255    Thread flushThread = new Thread(flusher);
4256    for (int i = 0; i < threadNum; i++) {
4257      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4258      incrementers[i].start();
4259    }
4260    flushThread.start();
4261    for (int i = 0; i < threadNum; i++) {
4262      incrementers[i].join();
4263    }
4264
4265    incrementDone.set(true);
4266    flushThread.join();
4267
4268    Get get = new Get(Incrementer.incRow);
4269    get.addColumn(Incrementer.family, Incrementer.qualifier);
4270    get.readVersions(1);
4271    Result res = this.region.get(get);
4272    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4273
4274    // we just got the latest version
4275    assertEquals(1, kvs.size());
4276    Cell kv = kvs.get(0);
4277    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4278  }
4279
4280  /**
4281   * TestCase for append
4282   */
4283  private static class Appender implements Runnable {
4284    private HRegion region;
4285    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4286    private final static byte[] family = Bytes.toBytes("family");
4287    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4288    private final static byte[] CHAR = Bytes.toBytes("a");
4289    private int appendCounter;
4290
4291    public Appender(HRegion region, int appendCounter) {
4292      this.region = region;
4293      this.appendCounter = appendCounter;
4294    }
4295
4296    @Override
4297    public void run() {
4298      int count = 0;
4299      while (count < appendCounter) {
4300        Append app = new Append(appendRow);
4301        app.addColumn(family, qualifier, CHAR);
4302        count++;
4303        try {
4304          region.append(app);
4305        } catch (IOException e) {
4306          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4307          break;
4308        }
4309      }
4310    }
4311  }
4312
4313  /**
4314   * Test case to check append function with memstore flushing
4315   * @throws Exception
4316   */
4317  @Test
4318  public void testParallelAppendWithMemStoreFlush() throws Exception {
4319    byte[] family = Appender.family;
4320    this.region = initHRegion(tableName, method, CONF, family);
4321    final HRegion region = this.region;
4322    final AtomicBoolean appendDone = new AtomicBoolean(false);
4323    Runnable flusher = new Runnable() {
4324      @Override
4325      public void run() {
4326        while (!appendDone.get()) {
4327          try {
4328            region.flush(true);
4329          } catch (Exception e) {
4330            e.printStackTrace();
4331          }
4332        }
4333      }
4334    };
4335
4336    // After all append finished, the value will append to threadNum *
4337    // appendCounter Appender.CHAR
4338    int threadNum = 20;
4339    int appendCounter = 100;
4340    byte[] expected = new byte[threadNum * appendCounter];
4341    for (int i = 0; i < threadNum * appendCounter; i++) {
4342      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4343    }
4344    Thread[] appenders = new Thread[threadNum];
4345    Thread flushThread = new Thread(flusher);
4346    for (int i = 0; i < threadNum; i++) {
4347      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4348      appenders[i].start();
4349    }
4350    flushThread.start();
4351    for (int i = 0; i < threadNum; i++) {
4352      appenders[i].join();
4353    }
4354
4355    appendDone.set(true);
4356    flushThread.join();
4357
4358    Get get = new Get(Appender.appendRow);
4359    get.addColumn(Appender.family, Appender.qualifier);
4360    get.readVersions(1);
4361    Result res = this.region.get(get);
4362    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4363
4364    // we just got the latest version
4365    assertEquals(1, kvs.size());
4366    Cell kv = kvs.get(0);
4367    byte[] appendResult = new byte[kv.getValueLength()];
4368    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4369    assertArrayEquals(expected, appendResult);
4370  }
4371
4372  /**
4373   * Test case to check put function with memstore flushing for same row, same ts
4374   * @throws Exception
4375   */
4376  @Test
4377  public void testPutWithMemStoreFlush() throws Exception {
4378    byte[] family = Bytes.toBytes("family");
4379    byte[] qualifier = Bytes.toBytes("qualifier");
4380    byte[] row = Bytes.toBytes("putRow");
4381    byte[] value = null;
4382    this.region = initHRegion(tableName, method, CONF, family);
4383    Put put = null;
4384    Get get = null;
4385    List<Cell> kvs = null;
4386    Result res = null;
4387
4388    put = new Put(row);
4389    value = Bytes.toBytes("value0");
4390    put.addColumn(family, qualifier, 1234567L, value);
4391    region.put(put);
4392    get = new Get(row);
4393    get.addColumn(family, qualifier);
4394    get.readAllVersions();
4395    res = this.region.get(get);
4396    kvs = res.getColumnCells(family, qualifier);
4397    assertEquals(1, kvs.size());
4398    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4399
4400    region.flush(true);
4401    get = new Get(row);
4402    get.addColumn(family, qualifier);
4403    get.readAllVersions();
4404    res = this.region.get(get);
4405    kvs = res.getColumnCells(family, qualifier);
4406    assertEquals(1, kvs.size());
4407    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4408
4409    put = new Put(row);
4410    value = Bytes.toBytes("value1");
4411    put.addColumn(family, qualifier, 1234567L, value);
4412    region.put(put);
4413    get = new Get(row);
4414    get.addColumn(family, qualifier);
4415    get.readAllVersions();
4416    res = this.region.get(get);
4417    kvs = res.getColumnCells(family, qualifier);
4418    assertEquals(1, kvs.size());
4419    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4420
4421    region.flush(true);
4422    get = new Get(row);
4423    get.addColumn(family, qualifier);
4424    get.readAllVersions();
4425    res = this.region.get(get);
4426    kvs = res.getColumnCells(family, qualifier);
4427    assertEquals(1, kvs.size());
4428    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4429  }
4430
4431  @Test
4432  public void testDurability() throws Exception {
4433    // there are 5 x 5 cases:
4434    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4435    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4436
4437    // expected cases for append and sync wal
4438    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4439    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4440    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4441
4442    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4443    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4444    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4445
4446    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4447    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4448
4449    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4450    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4451
4452    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4453    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4454    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4455
4456    // expected cases for async wal
4457    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4458    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4459    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4460    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4461    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4462    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4463
4464    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4465    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4466    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4467    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4468    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4469    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4470
4471    // expect skip wal cases
4472    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4473    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4474    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4475    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4476    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4477    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4478
4479  }
4480
4481  private void durabilityTest(String method, Durability tableDurability,
4482      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4483      final boolean expectSyncFromLogSyncer) throws Exception {
4484    Configuration conf = HBaseConfiguration.create(CONF);
4485    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4486    byte[] family = Bytes.toBytes("family");
4487    Path logDir = new Path(new Path(dir + method), "log");
4488    final Configuration walConf = new Configuration(conf);
4489    FSUtils.setRootDir(walConf, logDir);
4490    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4491    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4492    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4493    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4494    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4495    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4496        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4497        new byte[][] { family });
4498
4499    Put put = new Put(Bytes.toBytes("r1"));
4500    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4501    put.setDurability(mutationDurability);
4502    region.put(put);
4503
4504    //verify append called or not
4505    verify(wal, expectAppend ? times(1) : never())
4506      .append((HRegionInfo)any(), (WALKeyImpl)any(),
4507          (WALEdit)any(), Mockito.anyBoolean());
4508
4509    // verify sync called or not
4510    if (expectSync || expectSyncFromLogSyncer) {
4511      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4512        @Override
4513        public boolean evaluate() throws Exception {
4514          try {
4515            if (expectSync) {
4516              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4517            } else if (expectSyncFromLogSyncer) {
4518              verify(wal, times(1)).sync(); // wal syncer calls this one
4519            }
4520          } catch (Throwable ignore) {
4521          }
4522          return true;
4523        }
4524      });
4525    } else {
4526      //verify(wal, never()).sync(anyLong());
4527      verify(wal, never()).sync();
4528    }
4529
4530    HBaseTestingUtility.closeRegionAndWAL(this.region);
4531    wals.close();
4532    this.region = null;
4533  }
4534
4535  @Test
4536  public void testRegionReplicaSecondary() throws IOException {
4537    // create a primary region, load some data and flush
4538    // create a secondary region, and do a get against that
4539    Path rootDir = new Path(dir + name.getMethodName());
4540    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4541
4542    byte[][] families = new byte[][] {
4543        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4544    };
4545    byte[] cq = Bytes.toBytes("cq");
4546    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4547    for (byte[] family : families) {
4548      htd.addFamily(new HColumnDescriptor(family));
4549    }
4550
4551    long time = System.currentTimeMillis();
4552    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4553      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4554      false, time, 0);
4555    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4556      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4557      false, time, 1);
4558
4559    HRegion primaryRegion = null, secondaryRegion = null;
4560
4561    try {
4562      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4563          rootDir, TEST_UTIL.getConfiguration(), htd);
4564
4565      // load some data
4566      putData(primaryRegion, 0, 1000, cq, families);
4567
4568      // flush region
4569      primaryRegion.flush(true);
4570
4571      // open secondary region
4572      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4573
4574      verifyData(secondaryRegion, 0, 1000, cq, families);
4575    } finally {
4576      if (primaryRegion != null) {
4577        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4578      }
4579      if (secondaryRegion != null) {
4580        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4581      }
4582    }
4583  }
4584
4585  @Test
4586  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4587    // create a primary region, load some data and flush
4588    // create a secondary region, and do a put against that
4589    Path rootDir = new Path(dir + name.getMethodName());
4590    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4591
4592    byte[][] families = new byte[][] {
4593        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4594    };
4595    byte[] cq = Bytes.toBytes("cq");
4596    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4597    for (byte[] family : families) {
4598      htd.addFamily(new HColumnDescriptor(family));
4599    }
4600
4601    long time = System.currentTimeMillis();
4602    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4603      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4604      false, time, 0);
4605    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4606      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4607      false, time, 1);
4608
4609    HRegion primaryRegion = null, secondaryRegion = null;
4610
4611    try {
4612      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4613          rootDir, TEST_UTIL.getConfiguration(), htd);
4614
4615      // load some data
4616      putData(primaryRegion, 0, 1000, cq, families);
4617
4618      // flush region
4619      primaryRegion.flush(true);
4620
4621      // open secondary region
4622      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4623
4624      try {
4625        putData(secondaryRegion, 0, 1000, cq, families);
4626        fail("Should have thrown exception");
4627      } catch (IOException ex) {
4628        // expected
4629      }
4630    } finally {
4631      if (primaryRegion != null) {
4632        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4633      }
4634      if (secondaryRegion != null) {
4635        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4636      }
4637    }
4638  }
4639
4640  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4641    Configuration confForWAL = new Configuration(conf);
4642    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4643    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4644  }
4645
4646  @Test
4647  public void testCompactionFromPrimary() throws IOException {
4648    Path rootDir = new Path(dir + name.getMethodName());
4649    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4650
4651    byte[][] families = new byte[][] {
4652        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4653    };
4654    byte[] cq = Bytes.toBytes("cq");
4655    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4656    for (byte[] family : families) {
4657      htd.addFamily(new HColumnDescriptor(family));
4658    }
4659
4660    long time = System.currentTimeMillis();
4661    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4662      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4663      false, time, 0);
4664    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4665      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4666      false, time, 1);
4667
4668    HRegion primaryRegion = null, secondaryRegion = null;
4669
4670    try {
4671      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4672          rootDir, TEST_UTIL.getConfiguration(), htd);
4673
4674      // load some data
4675      putData(primaryRegion, 0, 1000, cq, families);
4676
4677      // flush region
4678      primaryRegion.flush(true);
4679
4680      // open secondary region
4681      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4682
4683      // move the file of the primary region to the archive, simulating a compaction
4684      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4685      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4686      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4687          .getStoreFiles(families[0]);
4688      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4689
4690      verifyData(secondaryRegion, 0, 1000, cq, families);
4691    } finally {
4692      if (primaryRegion != null) {
4693        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4694      }
4695      if (secondaryRegion != null) {
4696        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4697      }
4698    }
4699  }
4700
4701  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4702      IOException {
4703    putData(this.region, startRow, numRows, qf, families);
4704  }
4705
4706  private void putData(HRegion region,
4707      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4708    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4709  }
4710
4711  static void putData(HRegion region, Durability durability,
4712      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4713    for (int i = startRow; i < startRow + numRows; i++) {
4714      Put put = new Put(Bytes.toBytes("" + i));
4715      put.setDurability(durability);
4716      for (byte[] family : families) {
4717        put.addColumn(family, qf, null);
4718      }
4719      region.put(put);
4720      LOG.info(put.toString());
4721    }
4722  }
4723
4724  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4725      throws IOException {
4726    for (int i = startRow; i < startRow + numRows; i++) {
4727      byte[] row = Bytes.toBytes("" + i);
4728      Get get = new Get(row);
4729      for (byte[] family : families) {
4730        get.addColumn(family, qf);
4731      }
4732      Result result = newReg.get(get);
4733      Cell[] raw = result.rawCells();
4734      assertEquals(families.length, result.size());
4735      for (int j = 0; j < families.length; j++) {
4736        assertTrue(CellUtil.matchingRows(raw[j], row));
4737        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4738        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4739      }
4740    }
4741  }
4742
4743  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4744    // Now I have k, get values out and assert they are as expected.
4745    Get get = new Get(k).addFamily(family).readAllVersions();
4746    Cell[] results = r.get(get).rawCells();
4747    for (int j = 0; j < results.length; j++) {
4748      byte[] tmp = CellUtil.cloneValue(results[j]);
4749      // Row should be equal to value every time.
4750      assertTrue(Bytes.equals(k, tmp));
4751    }
4752  }
4753
4754  /*
4755   * Assert first value in the passed region is <code>firstValue</code>.
4756   *
4757   * @param r
4758   *
4759   * @param fs
4760   *
4761   * @param firstValue
4762   *
4763   * @throws IOException
4764   */
4765  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
4766      throws IOException {
4767    byte[][] families = { fs };
4768    Scan scan = new Scan();
4769    for (int i = 0; i < families.length; i++)
4770      scan.addFamily(families[i]);
4771    InternalScanner s = r.getScanner(scan);
4772    try {
4773      List<Cell> curVals = new ArrayList<>();
4774      boolean first = true;
4775      OUTER_LOOP: while (s.next(curVals)) {
4776        for (Cell kv : curVals) {
4777          byte[] val = CellUtil.cloneValue(kv);
4778          byte[] curval = val;
4779          if (first) {
4780            first = false;
4781            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
4782          } else {
4783            // Not asserting anything. Might as well break.
4784            break OUTER_LOOP;
4785          }
4786        }
4787      }
4788    } finally {
4789      s.close();
4790    }
4791  }
4792
4793  /**
4794   * Test that we get the expected flush results back
4795   */
4796  @Test
4797  public void testFlushResult() throws IOException {
4798    byte[] family = Bytes.toBytes("family");
4799
4800    this.region = initHRegion(tableName, method, family);
4801
4802    // empty memstore, flush doesn't run
4803    HRegion.FlushResult fr = region.flush(true);
4804    assertFalse(fr.isFlushSucceeded());
4805    assertFalse(fr.isCompactionNeeded());
4806
4807    // Flush enough files to get up to the threshold, doesn't need compactions
4808    for (int i = 0; i < 2; i++) {
4809      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4810      region.put(put);
4811      fr = region.flush(true);
4812      assertTrue(fr.isFlushSucceeded());
4813      assertFalse(fr.isCompactionNeeded());
4814    }
4815
4816    // Two flushes after the threshold, compactions are needed
4817    for (int i = 0; i < 2; i++) {
4818      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4819      region.put(put);
4820      fr = region.flush(true);
4821      assertTrue(fr.isFlushSucceeded());
4822      assertTrue(fr.isCompactionNeeded());
4823    }
4824  }
4825
4826  protected Configuration initSplit() {
4827    // Always compact if there is more than one store file.
4828    CONF.setInt("hbase.hstore.compactionThreshold", 2);
4829
4830    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
4831
4832    // Increase the amount of time between client retries
4833    CONF.setLong("hbase.client.pause", 15 * 1000);
4834
4835    // This size should make it so we always split using the addContent
4836    // below. After adding all data, the first region is 1.3M
4837    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
4838    return CONF;
4839  }
4840
4841  /**
4842   * @return A region on which you must call
4843   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4844   */
4845  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4846      byte[]... families) throws IOException {
4847    return initHRegion(tableName, callingMethod, conf, false, families);
4848  }
4849
4850  /**
4851   * @return A region on which you must call
4852   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4853   */
4854  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4855      boolean isReadOnly, byte[]... families) throws IOException {
4856    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
4857  }
4858
4859  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4860      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
4861      throws IOException {
4862    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
4863    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
4864    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
4865    return initHRegion(tableName, startKey, stopKey, isReadOnly,
4866        Durability.SYNC_WAL, wal, families);
4867  }
4868
4869  /**
4870   * @return A region on which you must call
4871   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4872   */
4873  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4874      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
4875    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
4876    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
4877        isReadOnly, durability, wal, families);
4878  }
4879
4880  /**
4881   * Assert that the passed in Cell has expected contents for the specified row,
4882   * column & timestamp.
4883   */
4884  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
4885    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
4886    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
4887        Bytes.toString(CellUtil.cloneRow(kv)));
4888    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
4889        Bytes.toString(CellUtil.cloneFamily(kv)));
4890    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
4891        Bytes.toString(CellUtil.cloneQualifier(kv)));
4892    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
4893    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
4894        Bytes.toString(CellUtil.cloneValue(kv)));
4895  }
4896
4897  @Test
4898  public void testReverseScanner_FromMemStore_SingleCF_Normal()
4899      throws IOException {
4900    byte[] rowC = Bytes.toBytes("rowC");
4901    byte[] rowA = Bytes.toBytes("rowA");
4902    byte[] rowB = Bytes.toBytes("rowB");
4903    byte[] cf = Bytes.toBytes("CF");
4904    byte[][] families = { cf };
4905    byte[] col = Bytes.toBytes("C");
4906    long ts = 1;
4907    this.region = initHRegion(tableName, method, families);
4908    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4909    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4910        null);
4911    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4912    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4913    Put put = null;
4914    put = new Put(rowC);
4915    put.add(kv1);
4916    put.add(kv11);
4917    region.put(put);
4918    put = new Put(rowA);
4919    put.add(kv2);
4920    region.put(put);
4921    put = new Put(rowB);
4922    put.add(kv3);
4923    region.put(put);
4924
4925    Scan scan = new Scan(rowC);
4926    scan.setMaxVersions(5);
4927    scan.setReversed(true);
4928    InternalScanner scanner = region.getScanner(scan);
4929    List<Cell> currRow = new ArrayList<>();
4930    boolean hasNext = scanner.next(currRow);
4931    assertEquals(2, currRow.size());
4932    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4933        .get(0).getRowLength(), rowC, 0, rowC.length));
4934    assertTrue(hasNext);
4935    currRow.clear();
4936    hasNext = scanner.next(currRow);
4937    assertEquals(1, currRow.size());
4938    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4939        .get(0).getRowLength(), rowB, 0, rowB.length));
4940    assertTrue(hasNext);
4941    currRow.clear();
4942    hasNext = scanner.next(currRow);
4943    assertEquals(1, currRow.size());
4944    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4945        .get(0).getRowLength(), rowA, 0, rowA.length));
4946    assertFalse(hasNext);
4947    scanner.close();
4948  }
4949
4950  @Test
4951  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
4952      throws IOException {
4953    byte[] rowC = Bytes.toBytes("rowC");
4954    byte[] rowA = Bytes.toBytes("rowA");
4955    byte[] rowB = Bytes.toBytes("rowB");
4956    byte[] rowD = Bytes.toBytes("rowD");
4957    byte[] cf = Bytes.toBytes("CF");
4958    byte[][] families = { cf };
4959    byte[] col = Bytes.toBytes("C");
4960    long ts = 1;
4961    this.region = initHRegion(tableName, method, families);
4962    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4963    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4964        null);
4965    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4966    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4967    Put put = null;
4968    put = new Put(rowC);
4969    put.add(kv1);
4970    put.add(kv11);
4971    region.put(put);
4972    put = new Put(rowA);
4973    put.add(kv2);
4974    region.put(put);
4975    put = new Put(rowB);
4976    put.add(kv3);
4977    region.put(put);
4978
4979    Scan scan = new Scan(rowD);
4980    List<Cell> currRow = new ArrayList<>();
4981    scan.setReversed(true);
4982    scan.setMaxVersions(5);
4983    InternalScanner scanner = region.getScanner(scan);
4984    boolean hasNext = scanner.next(currRow);
4985    assertEquals(2, currRow.size());
4986    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4987        .get(0).getRowLength(), rowC, 0, rowC.length));
4988    assertTrue(hasNext);
4989    currRow.clear();
4990    hasNext = scanner.next(currRow);
4991    assertEquals(1, currRow.size());
4992    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4993        .get(0).getRowLength(), rowB, 0, rowB.length));
4994    assertTrue(hasNext);
4995    currRow.clear();
4996    hasNext = scanner.next(currRow);
4997    assertEquals(1, currRow.size());
4998    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4999        .get(0).getRowLength(), rowA, 0, rowA.length));
5000    assertFalse(hasNext);
5001    scanner.close();
5002  }
5003
5004  @Test
5005  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5006      throws IOException {
5007    byte[] rowC = Bytes.toBytes("rowC");
5008    byte[] rowA = Bytes.toBytes("rowA");
5009    byte[] rowB = Bytes.toBytes("rowB");
5010    byte[] cf = Bytes.toBytes("CF");
5011    byte[][] families = { cf };
5012    byte[] col = Bytes.toBytes("C");
5013    long ts = 1;
5014    this.region = initHRegion(tableName, method, families);
5015    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5016    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5017        null);
5018    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5019    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5020    Put put = null;
5021    put = new Put(rowC);
5022    put.add(kv1);
5023    put.add(kv11);
5024    region.put(put);
5025    put = new Put(rowA);
5026    put.add(kv2);
5027    region.put(put);
5028    put = new Put(rowB);
5029    put.add(kv3);
5030    region.put(put);
5031    Scan scan = new Scan();
5032    List<Cell> currRow = new ArrayList<>();
5033    scan.setReversed(true);
5034    InternalScanner scanner = region.getScanner(scan);
5035    boolean hasNext = scanner.next(currRow);
5036    assertEquals(1, currRow.size());
5037    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5038        .get(0).getRowLength(), rowC, 0, rowC.length));
5039    assertTrue(hasNext);
5040    currRow.clear();
5041    hasNext = scanner.next(currRow);
5042    assertEquals(1, currRow.size());
5043    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5044        .get(0).getRowLength(), rowB, 0, rowB.length));
5045    assertTrue(hasNext);
5046    currRow.clear();
5047    hasNext = scanner.next(currRow);
5048    assertEquals(1, currRow.size());
5049    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5050        .get(0).getRowLength(), rowA, 0, rowA.length));
5051    assertFalse(hasNext);
5052    scanner.close();
5053  }
5054
5055  @Test
5056  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5057    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5058    byte[] rowA = Bytes.toBytes("rowA");
5059    byte[] rowB = Bytes.toBytes("rowB");
5060    byte[] rowC = Bytes.toBytes("rowC");
5061    byte[] rowD = Bytes.toBytes("rowD");
5062    byte[] rowE = Bytes.toBytes("rowE");
5063    byte[] cf = Bytes.toBytes("CF");
5064    byte[][] families = { cf };
5065    byte[] col1 = Bytes.toBytes("col1");
5066    byte[] col2 = Bytes.toBytes("col2");
5067    long ts = 1;
5068    this.region = initHRegion(tableName, method, families);
5069    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5070    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5071    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5072    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5073    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5074    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5075    Put put = null;
5076    put = new Put(rowA);
5077    put.add(kv1);
5078    region.put(put);
5079    put = new Put(rowB);
5080    put.add(kv2);
5081    region.put(put);
5082    put = new Put(rowC);
5083    put.add(kv3);
5084    region.put(put);
5085    put = new Put(rowD);
5086    put.add(kv4_1);
5087    region.put(put);
5088    put = new Put(rowD);
5089    put.add(kv4_2);
5090    region.put(put);
5091    put = new Put(rowE);
5092    put.add(kv5);
5093    region.put(put);
5094    region.flush(true);
5095    Scan scan = new Scan(rowD, rowA);
5096    scan.addColumn(families[0], col1);
5097    scan.setReversed(true);
5098    List<Cell> currRow = new ArrayList<>();
5099    InternalScanner scanner = region.getScanner(scan);
5100    boolean hasNext = scanner.next(currRow);
5101    assertEquals(1, currRow.size());
5102    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5103        .get(0).getRowLength(), rowD, 0, rowD.length));
5104    assertTrue(hasNext);
5105    currRow.clear();
5106    hasNext = scanner.next(currRow);
5107    assertEquals(1, currRow.size());
5108    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5109        .get(0).getRowLength(), rowC, 0, rowC.length));
5110    assertTrue(hasNext);
5111    currRow.clear();
5112    hasNext = scanner.next(currRow);
5113    assertEquals(1, currRow.size());
5114    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5115        .get(0).getRowLength(), rowB, 0, rowB.length));
5116    assertFalse(hasNext);
5117    scanner.close();
5118
5119    scan = new Scan(rowD, rowA);
5120    scan.addColumn(families[0], col2);
5121    scan.setReversed(true);
5122    currRow.clear();
5123    scanner = region.getScanner(scan);
5124    hasNext = scanner.next(currRow);
5125    assertEquals(1, currRow.size());
5126    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5127        .get(0).getRowLength(), rowD, 0, rowD.length));
5128    scanner.close();
5129  }
5130
5131  @Test
5132  public void testReverseScanner_smaller_blocksize() throws IOException {
5133    // case to ensure no conflict with HFile index optimization
5134    byte[] rowA = Bytes.toBytes("rowA");
5135    byte[] rowB = Bytes.toBytes("rowB");
5136    byte[] rowC = Bytes.toBytes("rowC");
5137    byte[] rowD = Bytes.toBytes("rowD");
5138    byte[] rowE = Bytes.toBytes("rowE");
5139    byte[] cf = Bytes.toBytes("CF");
5140    byte[][] families = { cf };
5141    byte[] col1 = Bytes.toBytes("col1");
5142    byte[] col2 = Bytes.toBytes("col2");
5143    long ts = 1;
5144    HBaseConfiguration config = new HBaseConfiguration();
5145    config.setInt("test.block.size", 1);
5146    this.region = initHRegion(tableName, method, config, families);
5147    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5148    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5149    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5150    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5151    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5152    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5153    Put put = null;
5154    put = new Put(rowA);
5155    put.add(kv1);
5156    region.put(put);
5157    put = new Put(rowB);
5158    put.add(kv2);
5159    region.put(put);
5160    put = new Put(rowC);
5161    put.add(kv3);
5162    region.put(put);
5163    put = new Put(rowD);
5164    put.add(kv4_1);
5165    region.put(put);
5166    put = new Put(rowD);
5167    put.add(kv4_2);
5168    region.put(put);
5169    put = new Put(rowE);
5170    put.add(kv5);
5171    region.put(put);
5172    region.flush(true);
5173    Scan scan = new Scan(rowD, rowA);
5174    scan.addColumn(families[0], col1);
5175    scan.setReversed(true);
5176    List<Cell> currRow = new ArrayList<>();
5177    InternalScanner scanner = region.getScanner(scan);
5178    boolean hasNext = scanner.next(currRow);
5179    assertEquals(1, currRow.size());
5180    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5181        .get(0).getRowLength(), rowD, 0, rowD.length));
5182    assertTrue(hasNext);
5183    currRow.clear();
5184    hasNext = scanner.next(currRow);
5185    assertEquals(1, currRow.size());
5186    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5187        .get(0).getRowLength(), rowC, 0, rowC.length));
5188    assertTrue(hasNext);
5189    currRow.clear();
5190    hasNext = scanner.next(currRow);
5191    assertEquals(1, currRow.size());
5192    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5193        .get(0).getRowLength(), rowB, 0, rowB.length));
5194    assertFalse(hasNext);
5195    scanner.close();
5196
5197    scan = new Scan(rowD, rowA);
5198    scan.addColumn(families[0], col2);
5199    scan.setReversed(true);
5200    currRow.clear();
5201    scanner = region.getScanner(scan);
5202    hasNext = scanner.next(currRow);
5203    assertEquals(1, currRow.size());
5204    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5205        .get(0).getRowLength(), rowD, 0, rowD.length));
5206    scanner.close();
5207  }
5208
5209  @Test
5210  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5211      throws IOException {
5212    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5213    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5214    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5215    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5216    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5217    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5218    byte[] cf1 = Bytes.toBytes("CF1");
5219    byte[] cf2 = Bytes.toBytes("CF2");
5220    byte[] cf3 = Bytes.toBytes("CF3");
5221    byte[][] families = { cf1, cf2, cf3 };
5222    byte[] col = Bytes.toBytes("C");
5223    long ts = 1;
5224    HBaseConfiguration conf = new HBaseConfiguration();
5225    // disable compactions in this test.
5226    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5227    this.region = initHRegion(tableName, method, conf, families);
5228    // kv naming style: kv(row number) totalKvCountInThisRow seq no
5229    KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5230        null);
5231    KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5232        null);
5233    KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5234        KeyValue.Type.Put, null);
5235    KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5236        null);
5237    KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5238        null);
5239    KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5240        null);
5241    KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5242        KeyValue.Type.Put, null);
5243    KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5244        null);
5245    KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5246        KeyValue.Type.Put, null);
5247    KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5248        null);
5249    KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5250        null);
5251    KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5252        KeyValue.Type.Put, null);
5253    KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5254        null);
5255    KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5256        KeyValue.Type.Put, null);
5257    KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5258        null);
5259    KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5260        null);
5261    // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5262    Put put = null;
5263    put = new Put(row1);
5264    put.add(kv1_2_1);
5265    region.put(put);
5266    put = new Put(row2);
5267    put.add(kv2_4_1);
5268    region.put(put);
5269    put = new Put(row4);
5270    put.add(kv4_5_4);
5271    put.add(kv4_5_5);
5272    region.put(put);
5273    region.flush(true);
5274    // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5275    put = new Put(row4);
5276    put.add(kv4_5_1);
5277    put.add(kv4_5_3);
5278    region.put(put);
5279    put = new Put(row1);
5280    put.add(kv1_2_2);
5281    region.put(put);
5282    put = new Put(row2);
5283    put.add(kv2_4_4);
5284    region.put(put);
5285    region.flush(true);
5286    // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5287    put = new Put(row4);
5288    put.add(kv4_5_2);
5289    region.put(put);
5290    put = new Put(row2);
5291    put.add(kv2_4_2);
5292    put.add(kv2_4_3);
5293    region.put(put);
5294    put = new Put(row3);
5295    put.add(kv3_2_2);
5296    region.put(put);
5297    region.flush(true);
5298    // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5299    // ( 2 kv)
5300    put = new Put(row0);
5301    put.add(kv0_1_1);
5302    region.put(put);
5303    put = new Put(row3);
5304    put.add(kv3_2_1);
5305    region.put(put);
5306    put = new Put(row5);
5307    put.add(kv5_2_1);
5308    put.add(kv5_2_2);
5309    region.put(put);
5310    // scan range = ["row4", min), skip the max "row5"
5311    Scan scan = new Scan(row4);
5312    scan.setMaxVersions(5);
5313    scan.setBatch(3);
5314    scan.setReversed(true);
5315    InternalScanner scanner = region.getScanner(scan);
5316    List<Cell> currRow = new ArrayList<>();
5317    boolean hasNext = false;
5318    // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5319    // included in scan range
5320    // "row4" takes 2 next() calls since batch=3
5321    hasNext = scanner.next(currRow);
5322    assertEquals(3, currRow.size());
5323    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5324        .get(0).getRowLength(), row4, 0, row4.length));
5325    assertTrue(hasNext);
5326    currRow.clear();
5327    hasNext = scanner.next(currRow);
5328    assertEquals(2, currRow.size());
5329    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5330        currRow.get(0).getRowLength(), row4, 0,
5331      row4.length));
5332    assertTrue(hasNext);
5333    // 2. scan out "row3" (2 kv)
5334    currRow.clear();
5335    hasNext = scanner.next(currRow);
5336    assertEquals(2, currRow.size());
5337    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5338        .get(0).getRowLength(), row3, 0, row3.length));
5339    assertTrue(hasNext);
5340    // 3. scan out "row2" (4 kvs)
5341    // "row2" takes 2 next() calls since batch=3
5342    currRow.clear();
5343    hasNext = scanner.next(currRow);
5344    assertEquals(3, currRow.size());
5345    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5346        .get(0).getRowLength(), row2, 0, row2.length));
5347    assertTrue(hasNext);
5348    currRow.clear();
5349    hasNext = scanner.next(currRow);
5350    assertEquals(1, currRow.size());
5351    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5352        .get(0).getRowLength(), row2, 0, row2.length));
5353    assertTrue(hasNext);
5354    // 4. scan out "row1" (2 kv)
5355    currRow.clear();
5356    hasNext = scanner.next(currRow);
5357    assertEquals(2, currRow.size());
5358    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5359        .get(0).getRowLength(), row1, 0, row1.length));
5360    assertTrue(hasNext);
5361    // 5. scan out "row0" (1 kv)
5362    currRow.clear();
5363    hasNext = scanner.next(currRow);
5364    assertEquals(1, currRow.size());
5365    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5366        .get(0).getRowLength(), row0, 0, row0.length));
5367    assertFalse(hasNext);
5368
5369    scanner.close();
5370  }
5371
5372  @Test
5373  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5374      throws IOException {
5375    byte[] row1 = Bytes.toBytes("row1");
5376    byte[] row2 = Bytes.toBytes("row2");
5377    byte[] row3 = Bytes.toBytes("row3");
5378    byte[] row4 = Bytes.toBytes("row4");
5379    byte[] cf1 = Bytes.toBytes("CF1");
5380    byte[] cf2 = Bytes.toBytes("CF2");
5381    byte[] cf3 = Bytes.toBytes("CF3");
5382    byte[] cf4 = Bytes.toBytes("CF4");
5383    byte[][] families = { cf1, cf2, cf3, cf4 };
5384    byte[] col = Bytes.toBytes("C");
5385    long ts = 1;
5386    HBaseConfiguration conf = new HBaseConfiguration();
5387    // disable compactions in this test.
5388    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5389    this.region = initHRegion(tableName, method, conf, families);
5390    KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5391    KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5392    KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5393    KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5394    // storefile1
5395    Put put = new Put(row1);
5396    put.add(kv1);
5397    region.put(put);
5398    region.flush(true);
5399    // storefile2
5400    put = new Put(row2);
5401    put.add(kv2);
5402    region.put(put);
5403    region.flush(true);
5404    // storefile3
5405    put = new Put(row3);
5406    put.add(kv3);
5407    region.put(put);
5408    region.flush(true);
5409    // memstore
5410    put = new Put(row4);
5411    put.add(kv4);
5412    region.put(put);
5413    // scan range = ["row4", min)
5414    Scan scan = new Scan(row4);
5415    scan.setReversed(true);
5416    scan.setBatch(10);
5417    InternalScanner scanner = region.getScanner(scan);
5418    List<Cell> currRow = new ArrayList<>();
5419    boolean hasNext = scanner.next(currRow);
5420    assertEquals(1, currRow.size());
5421    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5422        .get(0).getRowLength(), row4, 0, row4.length));
5423    assertTrue(hasNext);
5424    currRow.clear();
5425    hasNext = scanner.next(currRow);
5426    assertEquals(1, currRow.size());
5427    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5428        .get(0).getRowLength(), row3, 0, row3.length));
5429    assertTrue(hasNext);
5430    currRow.clear();
5431    hasNext = scanner.next(currRow);
5432    assertEquals(1, currRow.size());
5433    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5434        .get(0).getRowLength(), row2, 0, row2.length));
5435    assertTrue(hasNext);
5436    currRow.clear();
5437    hasNext = scanner.next(currRow);
5438    assertEquals(1, currRow.size());
5439    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5440        .get(0).getRowLength(), row1, 0, row1.length));
5441    assertFalse(hasNext);
5442  }
5443
5444  /**
5445   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5446   */
5447  @Test
5448  public void testReverseScanner_StackOverflow() throws IOException {
5449    byte[] cf1 = Bytes.toBytes("CF1");
5450    byte[][] families = {cf1};
5451    byte[] col = Bytes.toBytes("C");
5452    HBaseConfiguration conf = new HBaseConfiguration();
5453    this.region = initHRegion(tableName, method, conf, families);
5454    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5455    Put put = new Put(Bytes.toBytes("19998"));
5456    put.addColumn(cf1, col, Bytes.toBytes("val"));
5457    region.put(put);
5458    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5459    Put put2 = new Put(Bytes.toBytes("19997"));
5460    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5461    region.put(put2);
5462
5463    Scan scan = new Scan(Bytes.toBytes("19998"));
5464    scan.setReversed(true);
5465    InternalScanner scanner = region.getScanner(scan);
5466
5467    // create one storefile contains many rows will be skipped
5468    // to check StoreFileScanner.seekToPreviousRow
5469    for (int i = 10000; i < 20000; i++) {
5470      Put p = new Put(Bytes.toBytes(""+i));
5471      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5472      region.put(p);
5473    }
5474    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5475
5476    // create one memstore contains many rows will be skipped
5477    // to check MemStoreScanner.seekToPreviousRow
5478    for (int i = 10000; i < 20000; i++) {
5479      Put p = new Put(Bytes.toBytes(""+i));
5480      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5481      region.put(p);
5482    }
5483
5484    List<Cell> currRow = new ArrayList<>();
5485    boolean hasNext;
5486    do {
5487      hasNext = scanner.next(currRow);
5488    } while (hasNext);
5489    assertEquals(2, currRow.size());
5490    assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5491      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5492    assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5493      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5494  }
5495
5496  @Test
5497  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5498    byte[] cf1 = Bytes.toBytes("CF1");
5499    byte[][] families = { cf1 };
5500    byte[] col = Bytes.toBytes("C");
5501    HBaseConfiguration conf = new HBaseConfiguration();
5502    this.region = initHRegion(tableName, method, conf, families);
5503    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5504    Put put = new Put(Bytes.toBytes("19996"));
5505    put.addColumn(cf1, col, Bytes.toBytes("val"));
5506    region.put(put);
5507    Put put2 = new Put(Bytes.toBytes("19995"));
5508    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5509    region.put(put2);
5510    // create a reverse scan
5511    Scan scan = new Scan(Bytes.toBytes("19996"));
5512    scan.setReversed(true);
5513    RegionScannerImpl scanner = region.getScanner(scan);
5514
5515    // flush the cache. This will reset the store scanner
5516    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5517
5518    // create one memstore contains many rows will be skipped
5519    // to check MemStoreScanner.seekToPreviousRow
5520    for (int i = 10000; i < 20000; i++) {
5521      Put p = new Put(Bytes.toBytes("" + i));
5522      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5523      region.put(p);
5524    }
5525    List<Cell> currRow = new ArrayList<>();
5526    boolean hasNext;
5527    boolean assertDone = false;
5528    do {
5529      hasNext = scanner.next(currRow);
5530      // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5531      // added here
5532      if (!assertDone) {
5533        StoreScanner current =
5534            (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5535        List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5536        assertEquals("There should be only one scanner the store file scanner", 1,
5537          scanners.size());
5538        assertDone = true;
5539      }
5540    } while (hasNext);
5541    assertEquals(2, currRow.size());
5542    assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5543      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5544    assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5545      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5546  }
5547
5548  @Test
5549  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5550    byte[] cf1 = Bytes.toBytes("CF1");
5551    byte[][] families = { cf1 };
5552    byte[] col = Bytes.toBytes("C");
5553
5554    HBaseConfiguration conf = new HBaseConfiguration();
5555    this.region = initHRegion(tableName, method, conf, families);
5556
5557    Put put = new Put(Bytes.toBytes("199996"));
5558    put.addColumn(cf1, col, Bytes.toBytes("val"));
5559    region.put(put);
5560    Put put2 = new Put(Bytes.toBytes("199995"));
5561    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5562    region.put(put2);
5563
5564    // Create a reverse scan
5565    Scan scan = new Scan(Bytes.toBytes("199996"));
5566    scan.setReversed(true);
5567    RegionScannerImpl scanner = region.getScanner(scan);
5568
5569    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5570    for (int i = 100000; i < 200000; i++) {
5571      Put p = new Put(Bytes.toBytes("" + i));
5572      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5573      region.put(p);
5574    }
5575    List<Cell> currRow = new ArrayList<>();
5576    boolean hasNext;
5577    do {
5578      hasNext = scanner.next(currRow);
5579    } while (hasNext);
5580
5581    assertEquals(2, currRow.size());
5582    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5583      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5584    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5585      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5586  }
5587
5588  @Test
5589  public void testWriteRequestsCounter() throws IOException {
5590    byte[] fam = Bytes.toBytes("info");
5591    byte[][] families = { fam };
5592    this.region = initHRegion(tableName, method, CONF, families);
5593
5594    Assert.assertEquals(0L, region.getWriteRequestsCount());
5595
5596    Put put = new Put(row);
5597    put.addColumn(fam, fam, fam);
5598
5599    Assert.assertEquals(0L, region.getWriteRequestsCount());
5600    region.put(put);
5601    Assert.assertEquals(1L, region.getWriteRequestsCount());
5602    region.put(put);
5603    Assert.assertEquals(2L, region.getWriteRequestsCount());
5604    region.put(put);
5605    Assert.assertEquals(3L, region.getWriteRequestsCount());
5606
5607    region.delete(new Delete(row));
5608    Assert.assertEquals(4L, region.getWriteRequestsCount());
5609  }
5610
5611  @Test
5612  public void testOpenRegionWrittenToWAL() throws Exception {
5613    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5614    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5615
5616    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5617    htd.addFamily(new HColumnDescriptor(fam1));
5618    htd.addFamily(new HColumnDescriptor(fam2));
5619
5620    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5621      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5622
5623    // open the region w/o rss and wal and flush some files
5624    region =
5625         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5626             .getConfiguration(), htd);
5627    assertNotNull(region);
5628
5629    // create a file in fam1 for the region before opening in OpenRegionHandler
5630    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5631    region.flush(true);
5632    HBaseTestingUtility.closeRegionAndWAL(region);
5633
5634    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5635
5636    // capture append() calls
5637    WAL wal = mockWAL();
5638    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5639
5640    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5641      TEST_UTIL.getConfiguration(), rss, null);
5642
5643    verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any()
5644      , editCaptor.capture(), anyBoolean());
5645
5646    WALEdit edit = editCaptor.getValue();
5647    assertNotNull(edit);
5648    assertNotNull(edit.getCells());
5649    assertEquals(1, edit.getCells().size());
5650    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5651    assertNotNull(desc);
5652
5653    LOG.info("RegionEventDescriptor from WAL: " + desc);
5654
5655    assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5656    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5657    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5658      hri.getEncodedNameAsBytes()));
5659    assertTrue(desc.getLogSequenceNumber() > 0);
5660    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5661    assertEquals(2, desc.getStoresCount());
5662
5663    StoreDescriptor store = desc.getStores(0);
5664    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5665    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5666    assertEquals(1, store.getStoreFileCount()); // 1store file
5667    assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5668
5669    store = desc.getStores(1);
5670    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5671    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5672    assertEquals(0, store.getStoreFileCount()); // no store files
5673  }
5674
5675  // Helper for test testOpenRegionWrittenToWALForLogReplay
5676  static class HRegionWithSeqId extends HRegion {
5677    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5678        final Configuration confParam, final RegionInfo regionInfo,
5679        final TableDescriptor htd, final RegionServerServices rsServices) {
5680      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5681    }
5682    @Override
5683    protected long getNextSequenceId(WAL wal) throws IOException {
5684      return 42;
5685    }
5686  }
5687
5688  @Test
5689  public void testFlushedFileWithNoTags() throws Exception {
5690    final TableName tableName = TableName.valueOf(name.getMethodName());
5691    HTableDescriptor htd = new HTableDescriptor(tableName);
5692    htd.addFamily(new HColumnDescriptor(fam1));
5693    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5694    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5695    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5696    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5697    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5698    region.put(put);
5699    region.flush(true);
5700    HStore store = region.getStore(fam1);
5701    Collection<HStoreFile> storefiles = store.getStorefiles();
5702    for (HStoreFile sf : storefiles) {
5703      assertFalse("Tags should not be present "
5704          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5705    }
5706  }
5707
5708  /**
5709   * Utility method to setup a WAL mock.
5710   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
5711   * @return a mock WAL
5712   * @throws IOException
5713   */
5714  private WAL mockWAL() throws IOException {
5715    WAL wal = mock(WAL.class);
5716    Mockito.when(wal.append((HRegionInfo)Mockito.any(),
5717        (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
5718      thenAnswer(new Answer<Long>() {
5719        @Override
5720        public Long answer(InvocationOnMock invocation) throws Throwable {
5721          WALKeyImpl key = invocation.getArgument(1);
5722          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5723          key.setWriteEntry(we);
5724          return 1L;
5725        }
5726
5727    });
5728    return wal;
5729  }
5730
5731  @Test
5732  public void testCloseRegionWrittenToWAL() throws Exception {
5733
5734    Path rootDir = new Path(dir + name.getMethodName());
5735    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
5736
5737    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
5738    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5739
5740    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5741    htd.addFamily(new HColumnDescriptor(fam1));
5742    htd.addFamily(new HColumnDescriptor(fam2));
5743
5744    final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5745      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5746
5747    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5748
5749    // capture append() calls
5750    WAL wal = mockWAL();
5751    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5752
5753
5754    // create and then open a region first so that it can be closed later
5755    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
5756    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5757      TEST_UTIL.getConfiguration(), rss, null);
5758
5759    // close the region
5760    region.close(false);
5761
5762    // 2 times, one for region open, the other close region
5763    verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(),
5764      editCaptor.capture(), anyBoolean());
5765
5766    WALEdit edit = editCaptor.getAllValues().get(1);
5767    assertNotNull(edit);
5768    assertNotNull(edit.getCells());
5769    assertEquals(1, edit.getCells().size());
5770    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5771    assertNotNull(desc);
5772
5773    LOG.info("RegionEventDescriptor from WAL: " + desc);
5774
5775    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
5776    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5777    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5778      hri.getEncodedNameAsBytes()));
5779    assertTrue(desc.getLogSequenceNumber() > 0);
5780    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5781    assertEquals(2, desc.getStoresCount());
5782
5783    StoreDescriptor store = desc.getStores(0);
5784    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5785    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5786    assertEquals(0, store.getStoreFileCount()); // no store files
5787
5788    store = desc.getStores(1);
5789    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5790    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5791    assertEquals(0, store.getStoreFileCount()); // no store files
5792  }
5793
5794  /**
5795   * Test RegionTooBusyException thrown when region is busy
5796   */
5797  @Test
5798  public void testRegionTooBusy() throws IOException {
5799    byte[] family = Bytes.toBytes("family");
5800    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
5801      HRegion.DEFAULT_BUSY_WAIT_DURATION);
5802    CONF.setLong("hbase.busy.wait.duration", 1000);
5803    region = initHRegion(tableName, method, CONF, family);
5804    final AtomicBoolean stopped = new AtomicBoolean(true);
5805    Thread t = new Thread(new Runnable() {
5806      @Override
5807      public void run() {
5808        try {
5809          region.lock.writeLock().lock();
5810          stopped.set(false);
5811          while (!stopped.get()) {
5812            Thread.sleep(100);
5813          }
5814        } catch (InterruptedException ie) {
5815        } finally {
5816          region.lock.writeLock().unlock();
5817        }
5818      }
5819    });
5820    t.start();
5821    Get get = new Get(row);
5822    try {
5823      while (stopped.get()) {
5824        Thread.sleep(100);
5825      }
5826      region.get(get);
5827      fail("Should throw RegionTooBusyException");
5828    } catch (InterruptedException ie) {
5829      fail("test interrupted");
5830    } catch (RegionTooBusyException e) {
5831      // Good, expected
5832    } finally {
5833      stopped.set(true);
5834      try {
5835        t.join();
5836      } catch (Throwable e) {
5837      }
5838
5839      HBaseTestingUtility.closeRegionAndWAL(region);
5840      region = null;
5841      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
5842    }
5843  }
5844
5845  @Test
5846  public void testCellTTLs() throws IOException {
5847    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
5848    EnvironmentEdgeManager.injectEdge(edge);
5849
5850    final byte[] row = Bytes.toBytes("testRow");
5851    final byte[] q1 = Bytes.toBytes("q1");
5852    final byte[] q2 = Bytes.toBytes("q2");
5853    final byte[] q3 = Bytes.toBytes("q3");
5854    final byte[] q4 = Bytes.toBytes("q4");
5855
5856    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5857    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
5858    hcd.setTimeToLive(10); // 10 seconds
5859    htd.addFamily(hcd);
5860
5861    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
5862    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
5863
5864    region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
5865            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
5866        TEST_UTIL.getDataTestDir(), conf, htd);
5867    assertNotNull(region);
5868    long now = EnvironmentEdgeManager.currentTime();
5869    // Add a cell that will expire in 5 seconds via cell TTL
5870    region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
5871      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5872        // TTL tags specify ts in milliseconds
5873        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5874    // Add a cell that will expire after 10 seconds via family setting
5875    region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
5876    // Add a cell that will expire in 15 seconds via cell TTL
5877    region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
5878      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5879        // TTL tags specify ts in milliseconds
5880        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5881    // Add a cell that will expire in 20 seconds via family setting
5882    region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
5883
5884    // Flush so we are sure store scanning gets this right
5885    region.flush(true);
5886
5887    // A query at time T+0 should return all cells
5888    Result r = region.get(new Get(row));
5889    assertNotNull(r.getValue(fam1, q1));
5890    assertNotNull(r.getValue(fam1, q2));
5891    assertNotNull(r.getValue(fam1, q3));
5892    assertNotNull(r.getValue(fam1, q4));
5893
5894    // Increment time to T+5 seconds
5895    edge.incrementTime(5000);
5896
5897    r = region.get(new Get(row));
5898    assertNull(r.getValue(fam1, q1));
5899    assertNotNull(r.getValue(fam1, q2));
5900    assertNotNull(r.getValue(fam1, q3));
5901    assertNotNull(r.getValue(fam1, q4));
5902
5903    // Increment time to T+10 seconds
5904    edge.incrementTime(5000);
5905
5906    r = region.get(new Get(row));
5907    assertNull(r.getValue(fam1, q1));
5908    assertNull(r.getValue(fam1, q2));
5909    assertNotNull(r.getValue(fam1, q3));
5910    assertNotNull(r.getValue(fam1, q4));
5911
5912    // Increment time to T+15 seconds
5913    edge.incrementTime(5000);
5914
5915    r = region.get(new Get(row));
5916    assertNull(r.getValue(fam1, q1));
5917    assertNull(r.getValue(fam1, q2));
5918    assertNull(r.getValue(fam1, q3));
5919    assertNotNull(r.getValue(fam1, q4));
5920
5921    // Increment time to T+20 seconds
5922    edge.incrementTime(10000);
5923
5924    r = region.get(new Get(row));
5925    assertNull(r.getValue(fam1, q1));
5926    assertNull(r.getValue(fam1, q2));
5927    assertNull(r.getValue(fam1, q3));
5928    assertNull(r.getValue(fam1, q4));
5929
5930    // Fun with disappearing increments
5931
5932    // Start at 1
5933    region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
5934    r = region.get(new Get(row));
5935    byte[] val = r.getValue(fam1, q1);
5936    assertNotNull(val);
5937    assertEquals(1L, Bytes.toLong(val));
5938
5939    // Increment with a TTL of 5 seconds
5940    Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
5941    incr.setTTL(5000);
5942    region.increment(incr); // 2
5943
5944    // New value should be 2
5945    r = region.get(new Get(row));
5946    val = r.getValue(fam1, q1);
5947    assertNotNull(val);
5948    assertEquals(2L, Bytes.toLong(val));
5949
5950    // Increment time to T+25 seconds
5951    edge.incrementTime(5000);
5952
5953    // Value should be back to 1
5954    r = region.get(new Get(row));
5955    val = r.getValue(fam1, q1);
5956    assertNotNull(val);
5957    assertEquals(1L, Bytes.toLong(val));
5958
5959    // Increment time to T+30 seconds
5960    edge.incrementTime(5000);
5961
5962    // Original value written at T+20 should be gone now via family TTL
5963    r = region.get(new Get(row));
5964    assertNull(r.getValue(fam1, q1));
5965  }
5966
5967  @Test
5968  public void testIncrementTimestampsAreMonotonic() throws IOException {
5969    region = initHRegion(tableName, method, CONF, fam1);
5970    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
5971    EnvironmentEdgeManager.injectEdge(edge);
5972
5973    edge.setValue(10);
5974    Increment inc = new Increment(row);
5975    inc.setDurability(Durability.SKIP_WAL);
5976    inc.addColumn(fam1, qual1, 1L);
5977    region.increment(inc);
5978
5979    Result result = region.get(new Get(row));
5980    Cell c = result.getColumnLatestCell(fam1, qual1);
5981    assertNotNull(c);
5982    assertEquals(10L, c.getTimestamp());
5983
5984    edge.setValue(1); // clock goes back
5985    region.increment(inc);
5986    result = region.get(new Get(row));
5987    c = result.getColumnLatestCell(fam1, qual1);
5988    assertEquals(11L, c.getTimestamp());
5989    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
5990  }
5991
5992  @Test
5993  public void testAppendTimestampsAreMonotonic() throws IOException {
5994    region = initHRegion(tableName, method, CONF, fam1);
5995    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
5996    EnvironmentEdgeManager.injectEdge(edge);
5997
5998    edge.setValue(10);
5999    Append a = new Append(row);
6000    a.setDurability(Durability.SKIP_WAL);
6001    a.addColumn(fam1, qual1, qual1);
6002    region.append(a);
6003
6004    Result result = region.get(new Get(row));
6005    Cell c = result.getColumnLatestCell(fam1, qual1);
6006    assertNotNull(c);
6007    assertEquals(10L, c.getTimestamp());
6008
6009    edge.setValue(1); // clock goes back
6010    region.append(a);
6011    result = region.get(new Get(row));
6012    c = result.getColumnLatestCell(fam1, qual1);
6013    assertEquals(11L, c.getTimestamp());
6014
6015    byte[] expected = new byte[qual1.length*2];
6016    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6017    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6018
6019    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6020      expected, 0, expected.length));
6021  }
6022
6023  @Test
6024  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6025    region = initHRegion(tableName, method, CONF, fam1);
6026    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6027    EnvironmentEdgeManager.injectEdge(edge);
6028
6029    edge.setValue(10);
6030    Put p = new Put(row);
6031    p.setDurability(Durability.SKIP_WAL);
6032    p.addColumn(fam1, qual1, qual1);
6033    region.put(p);
6034
6035    Result result = region.get(new Get(row));
6036    Cell c = result.getColumnLatestCell(fam1, qual1);
6037    assertNotNull(c);
6038    assertEquals(10L, c.getTimestamp());
6039
6040    edge.setValue(1); // clock goes back
6041    p = new Put(row);
6042    p.setDurability(Durability.SKIP_WAL);
6043    p.addColumn(fam1, qual1, qual2);
6044    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6045    result = region.get(new Get(row));
6046    c = result.getColumnLatestCell(fam1, qual1);
6047    assertEquals(10L, c.getTimestamp());
6048
6049    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6050      qual2, 0, qual2.length));
6051  }
6052
6053  @Test
6054  public void testBatchMutateWithWrongRegionException() throws Exception {
6055    final byte[] a = Bytes.toBytes("a");
6056    final byte[] b = Bytes.toBytes("b");
6057    final byte[] c = Bytes.toBytes("c"); // exclusive
6058
6059    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6060    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6061    region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6062
6063    Mutation[] mutations = new Mutation[] {
6064        new Put(a)
6065            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6066              .setRow(a)
6067              .setFamily(fam1)
6068              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6069              .setType(Cell.Type.Put)
6070              .build()),
6071        // this is outside the region boundary
6072        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6073              .setRow(c)
6074              .setFamily(fam1)
6075              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6076              .setType(Type.Put)
6077              .build()),
6078        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6079              .setRow(b)
6080              .setFamily(fam1)
6081              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6082              .setType(Cell.Type.Put)
6083              .build())
6084    };
6085
6086    OperationStatus[] status = region.batchMutate(mutations);
6087    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6088    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6089    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6090
6091
6092    // test with a row lock held for a long time
6093    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6094    ExecutorService exec = Executors.newFixedThreadPool(2);
6095    Future<Void> f1 = exec.submit(new Callable<Void>() {
6096      @Override
6097      public Void call() throws Exception {
6098        LOG.info("Acquiring row lock");
6099        RowLock rl = region.getRowLock(b);
6100        obtainedRowLock.countDown();
6101        LOG.info("Waiting for 5 seconds before releasing lock");
6102        Threads.sleep(5000);
6103        LOG.info("Releasing row lock");
6104        rl.release();
6105        return null;
6106      }
6107    });
6108    obtainedRowLock.await(30, TimeUnit.SECONDS);
6109
6110    Future<Void> f2 = exec.submit(new Callable<Void>() {
6111      @Override
6112      public Void call() throws Exception {
6113        Mutation[] mutations = new Mutation[] {
6114            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6115                .setRow(a)
6116                .setFamily(fam1)
6117                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6118                .setType(Cell.Type.Put)
6119                .build()),
6120            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6121                .setRow(b)
6122                .setFamily(fam1)
6123                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6124                .setType(Cell.Type.Put)
6125                .build()),
6126        };
6127
6128        // this will wait for the row lock, and it will eventually succeed
6129        OperationStatus[] status = region.batchMutate(mutations);
6130        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6131        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6132        return null;
6133      }
6134    });
6135
6136    f1.get();
6137    f2.get();
6138
6139    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6140  }
6141
6142  @Test
6143  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6144    region = initHRegion(tableName, method, CONF, fam1);
6145    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6146    EnvironmentEdgeManager.injectEdge(edge);
6147
6148    edge.setValue(10);
6149    Put p = new Put(row);
6150    p.setDurability(Durability.SKIP_WAL);
6151    p.addColumn(fam1, qual1, qual1);
6152    region.put(p);
6153
6154    Result result = region.get(new Get(row));
6155    Cell c = result.getColumnLatestCell(fam1, qual1);
6156    assertNotNull(c);
6157    assertEquals(10L, c.getTimestamp());
6158
6159    edge.setValue(1); // clock goes back
6160    p = new Put(row);
6161    p.setDurability(Durability.SKIP_WAL);
6162    p.addColumn(fam1, qual1, qual2);
6163    RowMutations rm = new RowMutations(row);
6164    rm.add(p);
6165    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6166        new BinaryComparator(qual1), rm));
6167    result = region.get(new Get(row));
6168    c = result.getColumnLatestCell(fam1, qual1);
6169    assertEquals(10L, c.getTimestamp());
6170    LOG.info("c value " +
6171      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6172
6173    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6174      qual2, 0, qual2.length));
6175  }
6176
6177  HRegion initHRegion(TableName tableName, String callingMethod,
6178      byte[]... families) throws IOException {
6179    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6180        families);
6181  }
6182
6183  /**
6184   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6185   * @throws IOException if IO error occurred during test
6186   */
6187  @Test
6188  public void testWritesWhileRollWriter() throws IOException {
6189    int testCount = 10;
6190    int numRows = 1024;
6191    int numFamilies = 2;
6192    int numQualifiers = 2;
6193    final byte[][] families = new byte[numFamilies][];
6194    for (int i = 0; i < numFamilies; i++) {
6195      families[i] = Bytes.toBytes("family" + i);
6196    }
6197    final byte[][] qualifiers = new byte[numQualifiers][];
6198    for (int i = 0; i < numQualifiers; i++) {
6199      qualifiers[i] = Bytes.toBytes("qual" + i);
6200    }
6201
6202    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6203    this.region = initHRegion(tableName, method, CONF, families);
6204    try {
6205      List<Thread> threads = new ArrayList<>();
6206      for (int i = 0; i < numRows; i++) {
6207        final int count = i;
6208        Thread t = new Thread(new Runnable() {
6209
6210          @Override
6211          public void run() {
6212            byte[] row = Bytes.toBytes("row" + count);
6213            Put put = new Put(row);
6214            put.setDurability(Durability.SYNC_WAL);
6215            byte[] value = Bytes.toBytes(String.valueOf(count));
6216            for (byte[] family : families) {
6217              for (byte[] qualifier : qualifiers) {
6218                put.addColumn(family, qualifier, count, value);
6219              }
6220            }
6221            try {
6222              region.put(put);
6223            } catch (IOException e) {
6224              throw new RuntimeException(e);
6225            }
6226          }
6227        });
6228        threads.add(t);
6229      }
6230      for (Thread t : threads) {
6231        t.start();
6232      }
6233
6234      for (int i = 0; i < testCount; i++) {
6235        region.getWAL().rollWriter();
6236        Thread.yield();
6237      }
6238    } finally {
6239      try {
6240        HBaseTestingUtility.closeRegionAndWAL(this.region);
6241        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6242      } catch (DroppedSnapshotException dse) {
6243        // We could get this on way out because we interrupt the background flusher and it could
6244        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6245        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6246        // flush because the accounting is 'off' since original DSE.
6247      }
6248      this.region = null;
6249    }
6250  }
6251
6252  @Test
6253  public void testMutateRow_WriteRequestCount() throws Exception {
6254    byte[] row1 = Bytes.toBytes("row1");
6255    byte[] fam1 = Bytes.toBytes("fam1");
6256    byte[] qf1 = Bytes.toBytes("qualifier");
6257    byte[] val1 = Bytes.toBytes("value1");
6258
6259    RowMutations rm = new RowMutations(row1);
6260    Put put = new Put(row1);
6261    put.addColumn(fam1, qf1, val1);
6262    rm.add(put);
6263
6264    this.region = initHRegion(tableName, method, CONF, fam1);
6265    long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6266    this.region.mutateRow(rm);
6267    long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6268    Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6269  }
6270
6271  @Test
6272  public void testBulkLoadReplicationEnabled() throws IOException {
6273    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6274    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6275    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6276
6277    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6278    htd.addFamily(new HColumnDescriptor(fam1));
6279    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6280        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6281    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6282        rss, null);
6283
6284    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6285    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6286    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6287    assertTrue(plugins.contains(replicationCoprocessorClass));
6288    assertTrue(region.getCoprocessorHost().
6289        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6290  }
6291
6292  /**
6293   * The same as HRegion class, the only difference is that instantiateHStore will
6294   * create a different HStore - HStoreForTesting. [HBASE-8518]
6295   */
6296  public static class HRegionForTesting extends HRegion {
6297
6298    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6299                             final Configuration confParam, final RegionInfo regionInfo,
6300                             final TableDescriptor htd, final RegionServerServices rsServices) {
6301      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6302          wal, confParam, htd, rsServices);
6303    }
6304
6305    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6306                             Configuration confParam, TableDescriptor htd,
6307                             RegionServerServices rsServices) {
6308      super(fs, wal, confParam, htd, rsServices);
6309    }
6310
6311    /**
6312     * Create HStore instance.
6313     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6314     */
6315    @Override
6316    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
6317        throws IOException {
6318      if (family.isMobEnabled()) {
6319        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6320          throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
6321              " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
6322              " accordingly.");
6323        }
6324        return new HMobStore(this, family, this.conf, warmup);
6325      }
6326      return new HStoreForTesting(this, family, this.conf, warmup);
6327    }
6328  }
6329
6330  /**
6331   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6332   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6333   * doesn't let hstore compaction complete. In the former edition, this config is set in
6334   * HStore class inside compact method, though this is just for testing, otherwise it
6335   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6336   * config (except for testing code).
6337   */
6338  public static class HStoreForTesting extends HStore {
6339
6340    protected HStoreForTesting(final HRegion region,
6341        final ColumnFamilyDescriptor family,
6342        final Configuration confParam, boolean warmup) throws IOException {
6343      super(region, family, confParam, warmup);
6344    }
6345
6346    @Override
6347    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6348        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6349        List<Path> newFiles) throws IOException {
6350      // let compaction incomplete.
6351      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6352        LOG.warn("hbase.hstore.compaction.complete is set to false");
6353        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6354        final boolean evictOnClose =
6355            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6356        for (Path newFile : newFiles) {
6357          // Create storefile around what we wrote with a reader on it.
6358          HStoreFile sf = createStoreFileAndReader(newFile);
6359          sf.closeStoreFile(evictOnClose);
6360          sfs.add(sf);
6361        }
6362        return sfs;
6363      }
6364      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6365    }
6366  }
6367}