001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyBoolean;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.Mockito.doThrow;
035import static org.mockito.Mockito.mock;
036import static org.mockito.Mockito.never;
037import static org.mockito.Mockito.spy;
038import static org.mockito.Mockito.times;
039import static org.mockito.Mockito.verify;
040import static org.mockito.Mockito.when;
041
042import java.io.IOException;
043import java.io.InterruptedIOException;
044import java.math.BigDecimal;
045import java.nio.charset.StandardCharsets;
046import java.security.PrivilegedExceptionAction;
047import java.util.ArrayList;
048import java.util.Arrays;
049import java.util.Collection;
050import java.util.List;
051import java.util.Map;
052import java.util.NavigableMap;
053import java.util.Objects;
054import java.util.TreeMap;
055import java.util.UUID;
056import java.util.concurrent.Callable;
057import java.util.concurrent.CountDownLatch;
058import java.util.concurrent.ExecutorService;
059import java.util.concurrent.Executors;
060import java.util.concurrent.Future;
061import java.util.concurrent.TimeUnit;
062import java.util.concurrent.atomic.AtomicBoolean;
063import java.util.concurrent.atomic.AtomicInteger;
064import java.util.concurrent.atomic.AtomicReference;
065import org.apache.commons.lang3.RandomStringUtils;
066import org.apache.hadoop.conf.Configuration;
067import org.apache.hadoop.fs.FSDataOutputStream;
068import org.apache.hadoop.fs.FileStatus;
069import org.apache.hadoop.fs.FileSystem;
070import org.apache.hadoop.fs.Path;
071import org.apache.hadoop.hbase.ArrayBackedTag;
072import org.apache.hadoop.hbase.Cell;
073import org.apache.hadoop.hbase.Cell.Type;
074import org.apache.hadoop.hbase.CellBuilderFactory;
075import org.apache.hadoop.hbase.CellBuilderType;
076import org.apache.hadoop.hbase.CellUtil;
077import org.apache.hadoop.hbase.CompareOperator;
078import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
079import org.apache.hadoop.hbase.DroppedSnapshotException;
080import org.apache.hadoop.hbase.HBaseClassTestRule;
081import org.apache.hadoop.hbase.HBaseConfiguration;
082import org.apache.hadoop.hbase.HBaseTestingUtility;
083import org.apache.hadoop.hbase.HColumnDescriptor;
084import org.apache.hadoop.hbase.HConstants;
085import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
086import org.apache.hadoop.hbase.HDFSBlocksDistribution;
087import org.apache.hadoop.hbase.HRegionInfo;
088import org.apache.hadoop.hbase.HTableDescriptor;
089import org.apache.hadoop.hbase.KeyValue;
090import org.apache.hadoop.hbase.KeyValueUtil;
091import org.apache.hadoop.hbase.MiniHBaseCluster;
092import org.apache.hadoop.hbase.MultithreadedTestUtil;
093import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
094import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
095import org.apache.hadoop.hbase.NotServingRegionException;
096import org.apache.hadoop.hbase.PrivateCellUtil;
097import org.apache.hadoop.hbase.RegionTooBusyException;
098import org.apache.hadoop.hbase.ServerName;
099import org.apache.hadoop.hbase.TableName;
100import org.apache.hadoop.hbase.TagType;
101import org.apache.hadoop.hbase.Waiter;
102import org.apache.hadoop.hbase.client.Append;
103import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
104import org.apache.hadoop.hbase.client.Delete;
105import org.apache.hadoop.hbase.client.Durability;
106import org.apache.hadoop.hbase.client.Get;
107import org.apache.hadoop.hbase.client.Increment;
108import org.apache.hadoop.hbase.client.Mutation;
109import org.apache.hadoop.hbase.client.Put;
110import org.apache.hadoop.hbase.client.RegionInfo;
111import org.apache.hadoop.hbase.client.RegionInfoBuilder;
112import org.apache.hadoop.hbase.client.Result;
113import org.apache.hadoop.hbase.client.RowMutations;
114import org.apache.hadoop.hbase.client.Scan;
115import org.apache.hadoop.hbase.client.Table;
116import org.apache.hadoop.hbase.client.TableDescriptor;
117import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
118import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
119import org.apache.hadoop.hbase.filter.BigDecimalComparator;
120import org.apache.hadoop.hbase.filter.BinaryComparator;
121import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
122import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
123import org.apache.hadoop.hbase.filter.Filter;
124import org.apache.hadoop.hbase.filter.FilterBase;
125import org.apache.hadoop.hbase.filter.FilterList;
126import org.apache.hadoop.hbase.filter.NullComparator;
127import org.apache.hadoop.hbase.filter.PrefixFilter;
128import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
129import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
130import org.apache.hadoop.hbase.filter.SubstringComparator;
131import org.apache.hadoop.hbase.filter.ValueFilter;
132import org.apache.hadoop.hbase.io.hfile.HFile;
133import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
134import org.apache.hadoop.hbase.monitoring.MonitoredTask;
135import org.apache.hadoop.hbase.monitoring.TaskMonitor;
136import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
137import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
138import org.apache.hadoop.hbase.regionserver.Region.RowLock;
139import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
141import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
142import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
143import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
144import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
145import org.apache.hadoop.hbase.security.User;
146import org.apache.hadoop.hbase.test.MetricsAssertHelper;
147import org.apache.hadoop.hbase.testclassification.LargeTests;
148import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
149import org.apache.hadoop.hbase.util.Bytes;
150import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
151import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
152import org.apache.hadoop.hbase.util.FSUtils;
153import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
154import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
155import org.apache.hadoop.hbase.util.Threads;
156import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
157import org.apache.hadoop.hbase.wal.FaultyFSLog;
158import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
159import org.apache.hadoop.hbase.wal.WAL;
160import org.apache.hadoop.hbase.wal.WALEdit;
161import org.apache.hadoop.hbase.wal.WALFactory;
162import org.apache.hadoop.hbase.wal.WALKeyImpl;
163import org.apache.hadoop.hbase.wal.WALProvider;
164import org.apache.hadoop.hbase.wal.WALProvider.Writer;
165import org.apache.hadoop.hbase.wal.WALSplitter;
166import org.junit.After;
167import org.junit.Assert;
168import org.junit.Before;
169import org.junit.ClassRule;
170import org.junit.Rule;
171import org.junit.Test;
172import org.junit.experimental.categories.Category;
173import org.junit.rules.ExpectedException;
174import org.junit.rules.TestName;
175import org.mockito.ArgumentCaptor;
176import org.mockito.ArgumentMatcher;
177import org.mockito.Mockito;
178import org.mockito.invocation.InvocationOnMock;
179import org.mockito.stubbing.Answer;
180import org.slf4j.Logger;
181import org.slf4j.LoggerFactory;
182
183import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
184import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
185import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
186import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
187import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
188
189import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
196
197/**
198 * Basic stand-alone testing of HRegion.  No clusters!
199 *
200 * A lot of the meta information for an HRegion now lives inside other HRegions
201 * or in the HBaseMaster, so only basic testing is possible.
202 */
203@Category({VerySlowRegionServerTests.class, LargeTests.class})
204@SuppressWarnings("deprecation")
205public class TestHRegion {
206
207  @ClassRule
208  public static final HBaseClassTestRule CLASS_RULE =
209      HBaseClassTestRule.forClass(TestHRegion.class);
210
211  // Do not spin up clusters in here. If you need to spin up a cluster, do it
212  // over in TestHRegionOnCluster.
213  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
214  @Rule
215  public TestName name = new TestName();
216  @Rule public final ExpectedException thrown = ExpectedException.none();
217
218  private static final String COLUMN_FAMILY = "MyCF";
219  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
220  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
221
222  HRegion region = null;
223  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
224  protected static HBaseTestingUtility TEST_UTIL;
225  public static Configuration CONF ;
226  private String dir;
227  private static FileSystem FILESYSTEM;
228  private final int MAX_VERSIONS = 2;
229
230  // Test names
231  protected TableName tableName;
232  protected String method;
233  protected final byte[] qual = Bytes.toBytes("qual");
234  protected final byte[] qual1 = Bytes.toBytes("qual1");
235  protected final byte[] qual2 = Bytes.toBytes("qual2");
236  protected final byte[] qual3 = Bytes.toBytes("qual3");
237  protected final byte[] value = Bytes.toBytes("value");
238  protected final byte[] value1 = Bytes.toBytes("value1");
239  protected final byte[] value2 = Bytes.toBytes("value2");
240  protected final byte[] row = Bytes.toBytes("rowA");
241  protected final byte[] row2 = Bytes.toBytes("rowB");
242
243  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
244      .getInstance(MetricsAssertHelper.class);
245
246  @Before
247  public void setup() throws IOException {
248    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
249    FILESYSTEM = TEST_UTIL.getTestFileSystem();
250    CONF = TEST_UTIL.getConfiguration();
251    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
252    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
253    method = name.getMethodName();
254    tableName = TableName.valueOf(method);
255    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
256  }
257
258  @After
259  public void tearDown() throws Exception {
260    EnvironmentEdgeManagerTestHelper.reset();
261    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
262    TEST_UTIL.cleanupTestDir();
263  }
264
265  /**
266   * Test that I can use the max flushed sequence id after the close.
267   * @throws IOException
268   */
269  @Test
270  public void testSequenceId() throws IOException {
271    HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
272    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
273    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
274    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
275    region.close();
276    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
277    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
278    // Open region again.
279    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
280    byte [] value = Bytes.toBytes(method);
281    // Make a random put against our cf.
282    Put put = new Put(value);
283    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
284    region.put(put);
285    // No flush yet so init numbers should still be in place.
286    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
287    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
288    region.flush(true);
289    long max = region.getMaxFlushedSeqId();
290    region.close();
291    assertEquals(max, region.getMaxFlushedSeqId());
292  }
293
294  /**
295   * Test for Bug 2 of HBASE-10466.
296   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
297   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
298   * flush is skipped and only the second flush takes place. However, two flushes are required in
299   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
300   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
301   * flushes for region close."
302   * @throws IOException
303   */
304  @Test
305  public void testCloseCarryingSnapshot() throws IOException {
306    HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
307    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
308    // Get some random bytes.
309    byte [] value = Bytes.toBytes(method);
310    // Make a random put against our cf.
311    Put put = new Put(value);
312    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
313    // First put something in current memstore, which will be in snapshot after flusher.prepare()
314    region.put(put);
315    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
316    storeFlushCtx.prepare();
317    // Second put something in current memstore
318    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
319    region.put(put);
320    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
321    region.close();
322    assertEquals(0, region.getMemStoreDataSize());
323    HBaseTestingUtility.closeRegionAndWAL(region);
324  }
325
326  /*
327   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
328   * See HBASE-10845
329   */
330  @Test
331  public void testMemstoreSnapshotSize() throws IOException {
332    class MyFaultyFSLog extends FaultyFSLog {
333      StoreFlushContext storeFlushCtx;
334      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
335          throws IOException {
336        super(fs, rootDir, logName, conf);
337      }
338
339      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
340        this.storeFlushCtx = storeFlushCtx;
341      }
342
343      @Override
344      public void sync(long txid) throws IOException {
345        storeFlushCtx.prepare();
346        super.sync(txid);
347      }
348    }
349
350    FileSystem fs = FileSystem.get(CONF);
351    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
352    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
353    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
354        COLUMN_FAMILY_BYTES);
355
356    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
357    // Get some random bytes.
358    byte [] value = Bytes.toBytes(method);
359    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
360
361    Put put = new Put(value);
362    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
363    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
364
365    boolean threwIOE = false;
366    try {
367      region.put(put);
368    } catch (IOException ioe) {
369      threwIOE = true;
370    } finally {
371      assertTrue("The regionserver should have thrown an exception", threwIOE);
372    }
373    MemStoreSize mss = store.getFlushableSize();
374    assertTrue("flushable size should be zero, but it is " + mss,
375        mss.getDataSize() == 0);
376    HBaseTestingUtility.closeRegionAndWAL(region);
377  }
378
379  /**
380   * Create a WAL outside of the usual helper in
381   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
382   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
383   * {@code fs.file.impl}.
384   * @param callingMethod a unique component for the path, probably the name of the test method.
385   */
386  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
387      Configuration conf, TableName tableName) throws IOException {
388    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
389    final Configuration walConf = new Configuration(conf);
390    FSUtils.setRootDir(walConf, logDir);
391    return new WALFactory(walConf, callingMethod)
392        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
393  }
394
395  @Test
396  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
397    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
398    FileSystem fs = FileSystem.get(CONF);
399    Path rootDir = new Path(dir + testName);
400    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
401    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
402        COLUMN_FAMILY_BYTES);
403    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
404    assertEquals(0, region.getMemStoreDataSize());
405
406    // Put one value
407    byte [] value = Bytes.toBytes(method);
408    Put put = new Put(value);
409    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
410    region.put(put);
411    long onePutSize = region.getMemStoreDataSize();
412    assertTrue(onePutSize > 0);
413
414    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
415    doThrow(new IOException())
416       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
417    region.setCoprocessorHost(mockedCPHost);
418
419    put = new Put(value);
420    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
421    try {
422      region.put(put);
423      fail("Should have failed with IOException");
424    } catch (IOException expected) {
425    }
426    long expectedSize = onePutSize * 2;
427    assertEquals("memstoreSize should be incremented",
428        expectedSize, region.getMemStoreDataSize());
429    assertEquals("flushable size should be incremented",
430        expectedSize, store.getFlushableSize().getDataSize());
431
432    region.setCoprocessorHost(null);
433    HBaseTestingUtility.closeRegionAndWAL(region);
434  }
435
436  /**
437   * A test case of HBASE-21041
438   * @throws Exception Exception
439   */
440  @Test
441  public void testFlushAndMemstoreSizeCounting() throws Exception {
442    byte[] family = Bytes.toBytes("family");
443    this.region = initHRegion(tableName, method, CONF, family);
444    final WALFactory wals = new WALFactory(CONF, method);
445    try {
446      for (byte[] row : HBaseTestingUtility.ROWS) {
447        Put put = new Put(row);
448        put.addColumn(family, family, row);
449        region.put(put);
450      }
451      region.flush(true);
452      // After flush, data size should be zero
453      assertEquals(0, region.getMemStoreDataSize());
454      // After flush, a new active mutable segment is created, so the heap size
455      // should equal to MutableSegment.DEEP_OVERHEAD
456      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
457      // After flush, offheap should be zero
458      assertEquals(0, region.getMemStoreOffHeapSize());
459    } finally {
460      HBaseTestingUtility.closeRegionAndWAL(this.region);
461      this.region = null;
462      wals.close();
463    }
464  }
465
466  /**
467   * Test we do not lose data if we fail a flush and then close.
468   * Part of HBase-10466.  Tests the following from the issue description:
469   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
470   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
471   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
472   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
473   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
474   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
475   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
476   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
477   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
478   * if memstoreSize is not larger than 0."
479   * @throws Exception
480   */
481  @Test
482  public void testFlushSizeAccounting() throws Exception {
483    final Configuration conf = HBaseConfiguration.create(CONF);
484    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
485    // Only retry once.
486    conf.setInt("hbase.hstore.flush.retries.number", 1);
487    final User user =
488      User.createUserForTesting(conf, method, new String[]{"foo"});
489    // Inject our faulty LocalFileSystem
490    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
491    user.runAs(new PrivilegedExceptionAction<Object>() {
492      @Override
493      public Object run() throws Exception {
494        // Make sure it worked (above is sensitive to caching details in hadoop core)
495        FileSystem fs = FileSystem.get(conf);
496        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
497        FaultyFileSystem ffs = (FaultyFileSystem)fs;
498        HRegion region = null;
499        try {
500          // Initialize region
501          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
502              COLUMN_FAMILY_BYTES);
503          long size = region.getMemStoreDataSize();
504          Assert.assertEquals(0, size);
505          // Put one item into memstore.  Measure the size of one item in memstore.
506          Put p1 = new Put(row);
507          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
508          region.put(p1);
509          final long sizeOfOnePut = region.getMemStoreDataSize();
510          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
511          try {
512            LOG.info("Flushing");
513            region.flush(true);
514            Assert.fail("Didn't bubble up IOE!");
515          } catch (DroppedSnapshotException dse) {
516            // What we are expecting
517            region.closing.set(false); // this is needed for the rest of the test to work
518          }
519          // Make it so all writes succeed from here on out
520          ffs.fault.set(false);
521          // Check sizes.  Should still be the one entry.
522          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
523          // Now add two entries so that on this next flush that fails, we can see if we
524          // subtract the right amount, the snapshot size only.
525          Put p2 = new Put(row);
526          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
527          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
528          region.put(p2);
529          long expectedSize = sizeOfOnePut * 3;
530          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
531          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
532          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
533          // it
534          region.flush(true);
535          // Make sure our memory accounting is right.
536          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
537        } finally {
538          HBaseTestingUtility.closeRegionAndWAL(region);
539        }
540        return null;
541      }
542    });
543    FileSystem.closeAllForUGI(user.getUGI());
544  }
545
546  @Test
547  public void testCloseWithFailingFlush() throws Exception {
548    final Configuration conf = HBaseConfiguration.create(CONF);
549    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
550    // Only retry once.
551    conf.setInt("hbase.hstore.flush.retries.number", 1);
552    final User user =
553      User.createUserForTesting(conf, this.method, new String[]{"foo"});
554    // Inject our faulty LocalFileSystem
555    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
556    user.runAs(new PrivilegedExceptionAction<Object>() {
557      @Override
558      public Object run() throws Exception {
559        // Make sure it worked (above is sensitive to caching details in hadoop core)
560        FileSystem fs = FileSystem.get(conf);
561        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
562        FaultyFileSystem ffs = (FaultyFileSystem)fs;
563        HRegion region = null;
564        try {
565          // Initialize region
566          region = initHRegion(tableName, null, null, false,
567              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
568          long size = region.getMemStoreDataSize();
569          Assert.assertEquals(0, size);
570          // Put one item into memstore.  Measure the size of one item in memstore.
571          Put p1 = new Put(row);
572          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
573          region.put(p1);
574          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
575          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
576          StoreFlushContext storeFlushCtx =
577              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
578          storeFlushCtx.prepare();
579          // Now add two entries to the foreground memstore.
580          Put p2 = new Put(row);
581          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
582          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
583          region.put(p2);
584          // Now try close on top of a failing flush.
585          region.close();
586          fail();
587        } catch (DroppedSnapshotException dse) {
588          // Expected
589          LOG.info("Expected DroppedSnapshotException");
590        } finally {
591          // Make it so all writes succeed from here on out so can close clean
592          ffs.fault.set(false);
593          HBaseTestingUtility.closeRegionAndWAL(region);
594        }
595        return null;
596      }
597    });
598    FileSystem.closeAllForUGI(user.getUGI());
599  }
600
601  @Test
602  public void testCompactionAffectedByScanners() throws Exception {
603    byte[] family = Bytes.toBytes("family");
604    this.region = initHRegion(tableName, method, CONF, family);
605
606    Put put = new Put(Bytes.toBytes("r1"));
607    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
608    region.put(put);
609    region.flush(true);
610
611    Scan scan = new Scan();
612    scan.setMaxVersions(3);
613    // open the first scanner
614    RegionScanner scanner1 = region.getScanner(scan);
615
616    Delete delete = new Delete(Bytes.toBytes("r1"));
617    region.delete(delete);
618    region.flush(true);
619
620    // open the second scanner
621    RegionScanner scanner2 = region.getScanner(scan);
622
623    List<Cell> results = new ArrayList<>();
624
625    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
626
627    // make a major compaction
628    region.compact(true);
629
630    // open the third scanner
631    RegionScanner scanner3 = region.getScanner(scan);
632
633    // get data from scanner 1, 2, 3 after major compaction
634    scanner1.next(results);
635    System.out.println(results);
636    assertEquals(1, results.size());
637
638    results.clear();
639    scanner2.next(results);
640    System.out.println(results);
641    assertEquals(0, results.size());
642
643    results.clear();
644    scanner3.next(results);
645    System.out.println(results);
646    assertEquals(0, results.size());
647  }
648
649  @Test
650  public void testToShowNPEOnRegionScannerReseek() throws Exception {
651    byte[] family = Bytes.toBytes("family");
652    this.region = initHRegion(tableName, method, CONF, family);
653
654    Put put = new Put(Bytes.toBytes("r1"));
655    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
656    region.put(put);
657    put = new Put(Bytes.toBytes("r2"));
658    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
659    region.put(put);
660    region.flush(true);
661
662    Scan scan = new Scan();
663    scan.setMaxVersions(3);
664    // open the first scanner
665    RegionScanner scanner1 = region.getScanner(scan);
666
667    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
668
669    region.compact(true);
670
671    scanner1.reseek(Bytes.toBytes("r2"));
672    List<Cell> results = new ArrayList<>();
673    scanner1.next(results);
674    Cell keyValue = results.get(0);
675    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
676    scanner1.close();
677  }
678
679  @Test
680  public void testSkipRecoveredEditsReplay() throws Exception {
681    byte[] family = Bytes.toBytes("family");
682    this.region = initHRegion(tableName, method, CONF, family);
683    final WALFactory wals = new WALFactory(CONF, method);
684    try {
685      Path regiondir = region.getRegionFileSystem().getRegionDir();
686      FileSystem fs = region.getRegionFileSystem().getFileSystem();
687      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
688
689      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
690
691      long maxSeqId = 1050;
692      long minSeqId = 1000;
693
694      for (long i = minSeqId; i <= maxSeqId; i += 10) {
695        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
696        fs.create(recoveredEdits);
697        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
698
699        long time = System.nanoTime();
700        WALEdit edit = new WALEdit();
701        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
702            .toBytes(i)));
703        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
704            HConstants.DEFAULT_CLUSTER_ID), edit));
705
706        writer.close();
707      }
708      MonitoredTask status = TaskMonitor.get().createStatus(method);
709      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
710      for (HStore store : region.getStores()) {
711        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
712      }
713      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
714      assertEquals(maxSeqId, seqId);
715      region.getMVCC().advanceTo(seqId);
716      Get get = new Get(row);
717      Result result = region.get(get);
718      for (long i = minSeqId; i <= maxSeqId; i += 10) {
719        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
720        assertEquals(1, kvs.size());
721        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
722      }
723    } finally {
724      HBaseTestingUtility.closeRegionAndWAL(this.region);
725      this.region = null;
726      wals.close();
727    }
728  }
729
730  @Test
731  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
732    byte[] family = Bytes.toBytes("family");
733    this.region = initHRegion(tableName, method, CONF, family);
734    final WALFactory wals = new WALFactory(CONF, method);
735    try {
736      Path regiondir = region.getRegionFileSystem().getRegionDir();
737      FileSystem fs = region.getRegionFileSystem().getFileSystem();
738      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
739
740      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
741
742      long maxSeqId = 1050;
743      long minSeqId = 1000;
744
745      for (long i = minSeqId; i <= maxSeqId; i += 10) {
746        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
747        fs.create(recoveredEdits);
748        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
749
750        long time = System.nanoTime();
751        WALEdit edit = new WALEdit();
752        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
753            .toBytes(i)));
754        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
755            HConstants.DEFAULT_CLUSTER_ID), edit));
756
757        writer.close();
758      }
759      long recoverSeqId = 1030;
760      MonitoredTask status = TaskMonitor.get().createStatus(method);
761      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
762      for (HStore store : region.getStores()) {
763        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
764      }
765      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
766      assertEquals(maxSeqId, seqId);
767      region.getMVCC().advanceTo(seqId);
768      Get get = new Get(row);
769      Result result = region.get(get);
770      for (long i = minSeqId; i <= maxSeqId; i += 10) {
771        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
772        if (i < recoverSeqId) {
773          assertEquals(0, kvs.size());
774        } else {
775          assertEquals(1, kvs.size());
776          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
777        }
778      }
779    } finally {
780      HBaseTestingUtility.closeRegionAndWAL(this.region);
781      this.region = null;
782      wals.close();
783    }
784  }
785
786  @Test
787  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
788    byte[] family = Bytes.toBytes("family");
789    this.region = initHRegion(tableName, method, CONF, family);
790    try {
791      Path regiondir = region.getRegionFileSystem().getRegionDir();
792      FileSystem fs = region.getRegionFileSystem().getFileSystem();
793
794      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
795      for (int i = 1000; i < 1050; i += 10) {
796        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
797        FSDataOutputStream dos = fs.create(recoveredEdits);
798        dos.writeInt(i);
799        dos.close();
800      }
801      long minSeqId = 2000;
802      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
803      FSDataOutputStream dos = fs.create(recoveredEdits);
804      dos.close();
805
806      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
807      for (HStore store : region.getStores()) {
808        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
809      }
810      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
811      assertEquals(minSeqId, seqId);
812    } finally {
813      HBaseTestingUtility.closeRegionAndWAL(this.region);
814      this.region = null;
815    }
816  }
817
818  @Test
819  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
820    byte[] family = Bytes.toBytes("family");
821    this.region = initHRegion(tableName, method, CONF, family);
822    final WALFactory wals = new WALFactory(CONF, method);
823    try {
824      Path regiondir = region.getRegionFileSystem().getRegionDir();
825      FileSystem fs = region.getRegionFileSystem().getFileSystem();
826      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
827      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
828
829      assertEquals(0, region.getStoreFileList(columns).size());
830
831      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
832
833      long maxSeqId = 1050;
834      long minSeqId = 1000;
835
836      for (long i = minSeqId; i <= maxSeqId; i += 10) {
837        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
838        fs.create(recoveredEdits);
839        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
840
841        long time = System.nanoTime();
842        WALEdit edit = null;
843        if (i == maxSeqId) {
844          edit = WALEdit.createCompaction(region.getRegionInfo(),
845          CompactionDescriptor.newBuilder()
846          .setTableName(ByteString.copyFrom(tableName.getName()))
847          .setFamilyName(ByteString.copyFrom(regionName))
848          .setEncodedRegionName(ByteString.copyFrom(regionName))
849          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
850          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
851          .build());
852        } else {
853          edit = new WALEdit();
854          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
855            .toBytes(i)));
856        }
857        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
858            HConstants.DEFAULT_CLUSTER_ID), edit));
859        writer.close();
860      }
861
862      long recoverSeqId = 1030;
863      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
864      MonitoredTask status = TaskMonitor.get().createStatus(method);
865      for (HStore store : region.getStores()) {
866        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
867      }
868      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
869      assertEquals(maxSeqId, seqId);
870
871      // assert that the files are flushed
872      assertEquals(1, region.getStoreFileList(columns).size());
873
874    } finally {
875      HBaseTestingUtility.closeRegionAndWAL(this.region);
876      this.region = null;
877      wals.close();
878    }
879  }
880
881  @Test
882  public void testRecoveredEditsReplayCompaction() throws Exception {
883    testRecoveredEditsReplayCompaction(false);
884    testRecoveredEditsReplayCompaction(true);
885  }
886
887  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
888    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
889    byte[] family = Bytes.toBytes("family");
890    this.region = initHRegion(tableName, method, CONF, family);
891    final WALFactory wals = new WALFactory(CONF, method);
892    try {
893      Path regiondir = region.getRegionFileSystem().getRegionDir();
894      FileSystem fs = region.getRegionFileSystem().getFileSystem();
895      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
896
897      long maxSeqId = 3;
898      long minSeqId = 0;
899
900      for (long i = minSeqId; i < maxSeqId; i++) {
901        Put put = new Put(Bytes.toBytes(i));
902        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
903        region.put(put);
904        region.flush(true);
905      }
906
907      // this will create a region with 3 files
908      assertEquals(3, region.getStore(family).getStorefilesCount());
909      List<Path> storeFiles = new ArrayList<>(3);
910      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
911        storeFiles.add(sf.getPath());
912      }
913
914      // disable compaction completion
915      CONF.setBoolean("hbase.hstore.compaction.complete", false);
916      region.compactStores();
917
918      // ensure that nothing changed
919      assertEquals(3, region.getStore(family).getStorefilesCount());
920
921      // now find the compacted file, and manually add it to the recovered edits
922      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
923      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
924      String errorMsg = "Expected to find 1 file in the region temp directory "
925          + "from the compaction, could not find any";
926      assertNotNull(errorMsg, files);
927      assertEquals(errorMsg, 1, files.length);
928      // move the file inside region dir
929      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
930          files[0].getPath());
931
932      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
933      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
934      for (int i=0; i < encodedNameAsBytes.length; i++) {
935        // Mix the byte array to have a new encodedName
936        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
937      }
938
939      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
940        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
941            storeFiles, Lists.newArrayList(newFile),
942            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
943
944      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
945          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
946
947      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
948
949      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
950      fs.create(recoveredEdits);
951      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
952
953      long time = System.nanoTime();
954
955      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
956          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
957          compactionDescriptor)));
958      writer.close();
959
960      // close the region now, and reopen again
961      region.getTableDescriptor();
962      region.getRegionInfo();
963      region.close();
964      try {
965        region = HRegion.openHRegion(region, null);
966      } catch (WrongRegionException wre) {
967        fail("Matching encoded region name should not have produced WrongRegionException");
968      }
969
970      // now check whether we have only one store file, the compacted one
971      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
972      for (HStoreFile sf : sfs) {
973        LOG.info(Objects.toString(sf.getPath()));
974      }
975      if (!mismatchedRegionName) {
976        assertEquals(1, region.getStore(family).getStorefilesCount());
977      }
978      files = FSUtils.listStatus(fs, tmpDir);
979      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
980
981      for (long i = minSeqId; i < maxSeqId; i++) {
982        Get get = new Get(Bytes.toBytes(i));
983        Result result = region.get(get);
984        byte[] value = result.getValue(family, Bytes.toBytes(i));
985        assertArrayEquals(Bytes.toBytes(i), value);
986      }
987    } finally {
988      HBaseTestingUtility.closeRegionAndWAL(this.region);
989      this.region = null;
990      wals.close();
991      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
992    }
993  }
994
995  @Test
996  public void testFlushMarkers() throws Exception {
997    // tests that flush markers are written to WAL and handled at recovered edits
998    byte[] family = Bytes.toBytes("family");
999    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1000    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1001    FSUtils.setRootDir(walConf, logDir);
1002    final WALFactory wals = new WALFactory(walConf, method);
1003    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1004
1005    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1006      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1007    try {
1008      Path regiondir = region.getRegionFileSystem().getRegionDir();
1009      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1010      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1011
1012      long maxSeqId = 3;
1013      long minSeqId = 0;
1014
1015      for (long i = minSeqId; i < maxSeqId; i++) {
1016        Put put = new Put(Bytes.toBytes(i));
1017        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1018        region.put(put);
1019        region.flush(true);
1020      }
1021
1022      // this will create a region with 3 files from flush
1023      assertEquals(3, region.getStore(family).getStorefilesCount());
1024      List<String> storeFiles = new ArrayList<>(3);
1025      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1026        storeFiles.add(sf.getPath().getName());
1027      }
1028
1029      // now verify that the flush markers are written
1030      wal.shutdown();
1031      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1032        TEST_UTIL.getConfiguration());
1033      try {
1034        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1035        long lastFlushSeqId = -1;
1036        while (true) {
1037          WAL.Entry entry = reader.next();
1038          if (entry == null) {
1039            break;
1040          }
1041          Cell cell = entry.getEdit().getCells().get(0);
1042          if (WALEdit.isMetaEditFamily(cell)) {
1043            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1044            assertNotNull(flushDesc);
1045            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1046            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1047              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1048            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1049              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1050            }
1051            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1052            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1053            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1054            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1055            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1056            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1057            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1058              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1059            } else {
1060              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1061              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1062            }
1063
1064            flushDescriptors.add(entry);
1065          }
1066        }
1067
1068        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1069
1070        // now write those markers to the recovered edits again.
1071
1072        Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
1073
1074        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1075        fs.create(recoveredEdits);
1076        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1077
1078        for (WAL.Entry entry : flushDescriptors) {
1079          writer.append(entry);
1080        }
1081        writer.close();
1082      } finally {
1083        if (null != reader) {
1084          try {
1085            reader.close();
1086          } catch (IOException exception) {
1087            LOG.warn("Problem closing wal: " + exception.getMessage());
1088            LOG.debug("exception details", exception);
1089          }
1090        }
1091      }
1092
1093
1094      // close the region now, and reopen again
1095      region.close();
1096      region = HRegion.openHRegion(region, null);
1097
1098      // now check whether we have can read back the data from region
1099      for (long i = minSeqId; i < maxSeqId; i++) {
1100        Get get = new Get(Bytes.toBytes(i));
1101        Result result = region.get(get);
1102        byte[] value = result.getValue(family, Bytes.toBytes(i));
1103        assertArrayEquals(Bytes.toBytes(i), value);
1104      }
1105    } finally {
1106      HBaseTestingUtility.closeRegionAndWAL(this.region);
1107      this.region = null;
1108      wals.close();
1109    }
1110  }
1111
1112  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1113    volatile FlushAction[] actions;
1114    public IsFlushWALMarker(FlushAction... actions) {
1115      this.actions = actions;
1116    }
1117    @Override
1118    public boolean matches(WALEdit edit) {
1119      List<Cell> cells = edit.getCells();
1120      if (cells.isEmpty()) {
1121        return false;
1122      }
1123      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1124        FlushDescriptor desc;
1125        try {
1126          desc = WALEdit.getFlushDescriptor(cells.get(0));
1127        } catch (IOException e) {
1128          LOG.warn(e.toString(), e);
1129          return false;
1130        }
1131        if (desc != null) {
1132          for (FlushAction action : actions) {
1133            if (desc.getAction() == action) {
1134              return true;
1135            }
1136          }
1137        }
1138      }
1139      return false;
1140    }
1141    public IsFlushWALMarker set(FlushAction... actions) {
1142      this.actions = actions;
1143      return this;
1144    }
1145  }
1146
1147  @Test
1148  public void testFlushMarkersWALFail() throws Exception {
1149    // test the cases where the WAL append for flush markers fail.
1150    byte[] family = Bytes.toBytes("family");
1151
1152    // spy an actual WAL implementation to throw exception (was not able to mock)
1153    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1154
1155    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1156    FSUtils.setRootDir(walConf, logDir);
1157    // Make up a WAL that we can manipulate at append time.
1158    class FailAppendFlushMarkerWAL extends FSHLog {
1159      volatile FlushAction [] flushActions = null;
1160
1161      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1162      throws IOException {
1163        super(fs, root, logDir, conf);
1164      }
1165
1166      @Override
1167      protected Writer createWriterInstance(Path path) throws IOException {
1168        final Writer w = super.createWriterInstance(path);
1169        return new Writer() {
1170          @Override
1171          public void close() throws IOException {
1172            w.close();
1173          }
1174
1175          @Override
1176          public void sync() throws IOException {
1177            w.sync();
1178          }
1179
1180          @Override
1181          public void append(Entry entry) throws IOException {
1182            List<Cell> cells = entry.getEdit().getCells();
1183            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1184              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1185              if (desc != null) {
1186                for (FlushAction flushAction: flushActions) {
1187                  if (desc.getAction().equals(flushAction)) {
1188                    throw new IOException("Failed to append flush marker! " + flushAction);
1189                  }
1190                }
1191              }
1192            }
1193            w.append(entry);
1194          }
1195
1196          @Override
1197          public long getLength() {
1198            return w.getLength();
1199          }
1200        };
1201      }
1202    }
1203    FailAppendFlushMarkerWAL wal =
1204      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1205        method, walConf);
1206    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1207      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1208    try {
1209      int i = 0;
1210      Put put = new Put(Bytes.toBytes(i));
1211      put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1212      put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1213      region.put(put);
1214
1215      // 1. Test case where START_FLUSH throws exception
1216      wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1217
1218      // start cache flush will throw exception
1219      try {
1220        region.flush(true);
1221        fail("This should have thrown exception");
1222      } catch (DroppedSnapshotException unexpected) {
1223        // this should not be a dropped snapshot exception. Meaning that RS will not abort
1224        throw unexpected;
1225      } catch (IOException expected) {
1226        // expected
1227      }
1228      // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1229      // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1230      region.close(true);
1231      wal.close();
1232
1233      // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1234      wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1235      wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1236            method, walConf);
1237
1238      this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1239        HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1240      region.put(put);
1241
1242      // 3. Test case where ABORT_FLUSH will throw exception.
1243      // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1244      // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
1245      wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1246
1247      try {
1248        region.flush(true);
1249        fail("This should have thrown exception");
1250      } catch (DroppedSnapshotException expected) {
1251        // we expect this exception, since we were able to write the snapshot, but failed to
1252        // write the flush marker to WAL
1253      } catch (IOException unexpected) {
1254        throw unexpected;
1255      }
1256
1257    } finally {
1258      HBaseTestingUtility.closeRegionAndWAL(this.region);
1259      this.region = null;
1260    }
1261  }
1262
1263  @Test
1264  public void testGetWhileRegionClose() throws IOException {
1265    Configuration hc = initSplit();
1266    int numRows = 100;
1267    byte[][] families = { fam1, fam2, fam3 };
1268
1269    // Setting up region
1270    this.region = initHRegion(tableName, method, hc, families);
1271    try {
1272      // Put data in region
1273      final int startRow = 100;
1274      putData(startRow, numRows, qual1, families);
1275      putData(startRow, numRows, qual2, families);
1276      putData(startRow, numRows, qual3, families);
1277      final AtomicBoolean done = new AtomicBoolean(false);
1278      final AtomicInteger gets = new AtomicInteger(0);
1279      GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1280      try {
1281        // Set ten threads running concurrently getting from the region.
1282        for (int i = 0; i < threads.length / 2; i++) {
1283          threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1284          threads[i].setDaemon(true);
1285          threads[i].start();
1286        }
1287        // Artificially make the condition by setting closing flag explicitly.
1288        // I can't make the issue happen with a call to region.close().
1289        this.region.closing.set(true);
1290        for (int i = threads.length / 2; i < threads.length; i++) {
1291          threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1292          threads[i].setDaemon(true);
1293          threads[i].start();
1294        }
1295      } finally {
1296        if (this.region != null) {
1297          HBaseTestingUtility.closeRegionAndWAL(this.region);
1298        }
1299      }
1300      done.set(true);
1301      for (GetTillDoneOrException t : threads) {
1302        try {
1303          t.join();
1304        } catch (InterruptedException e) {
1305          e.printStackTrace();
1306        }
1307        if (t.e != null) {
1308          LOG.info("Exception=" + t.e);
1309          assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1310        }
1311      }
1312    } finally {
1313      HBaseTestingUtility.closeRegionAndWAL(this.region);
1314      this.region = null;
1315    }
1316  }
1317
1318  /*
1319   * Thread that does get on single row until 'done' flag is flipped. If an
1320   * exception causes us to fail, it records it.
1321   */
1322  class GetTillDoneOrException extends Thread {
1323    private final Get g;
1324    private final AtomicBoolean done;
1325    private final AtomicInteger count;
1326    private Exception e;
1327
1328    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1329        final AtomicInteger c) {
1330      super("getter." + i);
1331      this.g = new Get(r);
1332      this.done = d;
1333      this.count = c;
1334    }
1335
1336    @Override
1337    public void run() {
1338      while (!this.done.get()) {
1339        try {
1340          assertTrue(region.get(g).size() > 0);
1341          this.count.incrementAndGet();
1342        } catch (Exception e) {
1343          this.e = e;
1344          break;
1345        }
1346      }
1347    }
1348  }
1349
1350  /*
1351   * An involved filter test. Has multiple column families and deletes in mix.
1352   */
1353  @Test
1354  public void testWeirdCacheBehaviour() throws Exception {
1355    final TableName tableName = TableName.valueOf(name.getMethodName());
1356    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1357        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1358    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1359    try {
1360      String value = "this is the value";
1361      String value2 = "this is some other value";
1362      String keyPrefix1 = "prefix1";
1363      String keyPrefix2 = "prefix2";
1364      String keyPrefix3 = "prefix3";
1365      putRows(this.region, 3, value, keyPrefix1);
1366      putRows(this.region, 3, value, keyPrefix2);
1367      putRows(this.region, 3, value, keyPrefix3);
1368      putRows(this.region, 3, value2, keyPrefix1);
1369      putRows(this.region, 3, value2, keyPrefix2);
1370      putRows(this.region, 3, value2, keyPrefix3);
1371      System.out.println("Checking values for key: " + keyPrefix1);
1372      assertEquals("Got back incorrect number of rows from scan", 3,
1373          getNumberOfRows(keyPrefix1, value2, this.region));
1374      System.out.println("Checking values for key: " + keyPrefix2);
1375      assertEquals("Got back incorrect number of rows from scan", 3,
1376          getNumberOfRows(keyPrefix2, value2, this.region));
1377      System.out.println("Checking values for key: " + keyPrefix3);
1378      assertEquals("Got back incorrect number of rows from scan", 3,
1379          getNumberOfRows(keyPrefix3, value2, this.region));
1380      deleteColumns(this.region, value2, keyPrefix1);
1381      deleteColumns(this.region, value2, keyPrefix2);
1382      deleteColumns(this.region, value2, keyPrefix3);
1383      System.out.println("Starting important checks.....");
1384      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1385          getNumberOfRows(keyPrefix1, value2, this.region));
1386      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1387          getNumberOfRows(keyPrefix2, value2, this.region));
1388      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1389          getNumberOfRows(keyPrefix3, value2, this.region));
1390    } finally {
1391      HBaseTestingUtility.closeRegionAndWAL(this.region);
1392      this.region = null;
1393    }
1394  }
1395
1396  @Test
1397  public void testAppendWithReadOnlyTable() throws Exception {
1398    final TableName tableName = TableName.valueOf(name.getMethodName());
1399    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1400    boolean exceptionCaught = false;
1401    Append append = new Append(Bytes.toBytes("somerow"));
1402    append.setDurability(Durability.SKIP_WAL);
1403    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1404        Bytes.toBytes("somevalue"));
1405    try {
1406      region.append(append);
1407    } catch (IOException e) {
1408      exceptionCaught = true;
1409    } finally {
1410      HBaseTestingUtility.closeRegionAndWAL(this.region);
1411      this.region = null;
1412    }
1413    assertTrue(exceptionCaught == true);
1414  }
1415
1416  @Test
1417  public void testIncrWithReadOnlyTable() throws Exception {
1418    final TableName tableName = TableName.valueOf(name.getMethodName());
1419    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1420    boolean exceptionCaught = false;
1421    Increment inc = new Increment(Bytes.toBytes("somerow"));
1422    inc.setDurability(Durability.SKIP_WAL);
1423    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1424    try {
1425      region.increment(inc);
1426    } catch (IOException e) {
1427      exceptionCaught = true;
1428    } finally {
1429      HBaseTestingUtility.closeRegionAndWAL(this.region);
1430      this.region = null;
1431    }
1432    assertTrue(exceptionCaught == true);
1433  }
1434
1435  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1436    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1437    int count = 0;
1438    boolean more = false;
1439    List<Cell> results = new ArrayList<>();
1440    do {
1441      more = scanner.next(results);
1442      if (results != null && !results.isEmpty())
1443        count++;
1444      else
1445        break;
1446      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1447      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1448      r.delete(delete);
1449      results.clear();
1450    } while (more);
1451    assertEquals("Did not perform correct number of deletes", 3, count);
1452  }
1453
1454  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1455    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1456    int numberOfResults = 0;
1457    List<Cell> results = new ArrayList<>();
1458    boolean more = false;
1459    do {
1460      more = resultScanner.next(results);
1461      if (results != null && !results.isEmpty())
1462        numberOfResults++;
1463      else
1464        break;
1465      for (Cell kv : results) {
1466        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1467      }
1468      results.clear();
1469    } while (more);
1470    return numberOfResults;
1471  }
1472
1473  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1474      throws IOException {
1475    // Defaults FilterList.Operator.MUST_PASS_ALL.
1476    FilterList allFilters = new FilterList();
1477    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1478    // Only return rows where this column value exists in the row.
1479    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1480        Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1481    filter.setFilterIfMissing(true);
1482    allFilters.addFilter(filter);
1483    Scan scan = new Scan();
1484    scan.addFamily(Bytes.toBytes("trans-blob"));
1485    scan.addFamily(Bytes.toBytes("trans-type"));
1486    scan.addFamily(Bytes.toBytes("trans-date"));
1487    scan.addFamily(Bytes.toBytes("trans-tags"));
1488    scan.addFamily(Bytes.toBytes("trans-group"));
1489    scan.setFilter(allFilters);
1490    return r.getScanner(scan);
1491  }
1492
1493  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1494    for (int i = 0; i < numRows; i++) {
1495      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1496      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1497      Put put = new Put(Bytes.toBytes(row));
1498      put.setDurability(Durability.SKIP_WAL);
1499      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1500      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1501      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1502      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1503      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1504      r.put(put);
1505    }
1506  }
1507
1508  @Test
1509  public void testFamilyWithAndWithoutColon() throws Exception {
1510    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1511    this.region = initHRegion(tableName, method, CONF, cf);
1512    try {
1513      Put p = new Put(tableName.toBytes());
1514      byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1515      p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1516      boolean exception = false;
1517      try {
1518        this.region.put(p);
1519      } catch (NoSuchColumnFamilyException e) {
1520        exception = true;
1521      }
1522      assertTrue(exception);
1523    } finally {
1524      HBaseTestingUtility.closeRegionAndWAL(this.region);
1525      this.region = null;
1526    }
1527  }
1528
1529  @Test
1530  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1531    final Put[] puts = new Put[10];
1532    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1533    try {
1534      long syncs = prepareRegionForBachPut(puts, source, false);
1535
1536      OperationStatus[] codes = this.region.batchMutate(puts);
1537      assertEquals(10, codes.length);
1538      for (int i = 0; i < 10; i++) {
1539        assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1540      }
1541      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1542
1543      LOG.info("Next a batch put with one invalid family");
1544      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1545      codes = this.region.batchMutate(puts);
1546      assertEquals(10, codes.length);
1547      for (int i = 0; i < 10; i++) {
1548        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1549            codes[i].getOperationStatusCode());
1550      }
1551
1552      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1553    } finally {
1554      HBaseTestingUtility.closeRegionAndWAL(this.region);
1555      this.region = null;
1556    }
1557  }
1558
1559  @Test
1560  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1561    final Put[] puts = new Put[10];
1562    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1563    try {
1564      long syncs = prepareRegionForBachPut(puts, source, false);
1565
1566      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1567
1568      LOG.info("batchPut will have to break into four batches to avoid row locks");
1569      RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1570      RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1571      RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1572      RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1573
1574      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1575      final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1576      final CountDownLatch startingPuts = new CountDownLatch(1);
1577      final CountDownLatch startingClose = new CountDownLatch(1);
1578      TestThread putter = new TestThread(ctx) {
1579        @Override
1580        public void doWork() throws IOException {
1581          startingPuts.countDown();
1582          retFromThread.set(region.batchMutate(puts));
1583        }
1584      };
1585      LOG.info("...starting put thread while holding locks");
1586      ctx.addThread(putter);
1587      ctx.startThreads();
1588
1589      // Now attempt to close the region from another thread.  Prior to HBASE-12565
1590      // this would cause the in-progress batchMutate operation to to fail with
1591      // exception because it use to release and re-acquire the close-guard lock
1592      // between batches.  Caller then didn't get status indicating which writes succeeded.
1593      // We now expect this thread to block until the batchMutate call finishes.
1594      Thread regionCloseThread = new TestThread(ctx) {
1595        @Override
1596        public void doWork() {
1597          try {
1598            startingPuts.await();
1599            // Give some time for the batch mutate to get in.
1600            // We don't want to race with the mutate
1601            Thread.sleep(10);
1602            startingClose.countDown();
1603            HBaseTestingUtility.closeRegionAndWAL(region);
1604          } catch (IOException e) {
1605            throw new RuntimeException(e);
1606          } catch (InterruptedException e) {
1607            throw new RuntimeException(e);
1608          }
1609        }
1610      };
1611      regionCloseThread.start();
1612
1613      startingClose.await();
1614      startingPuts.await();
1615      Thread.sleep(100);
1616      LOG.info("...releasing row lock 1, which should let put thread continue");
1617      rowLock1.release();
1618      rowLock2.release();
1619      rowLock3.release();
1620      waitForCounter(source, "syncTimeNumOps", syncs + 1);
1621
1622      LOG.info("...joining on put thread");
1623      ctx.stop();
1624      regionCloseThread.join();
1625
1626      OperationStatus[] codes = retFromThread.get();
1627      for (int i = 0; i < codes.length; i++) {
1628        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1629            codes[i].getOperationStatusCode());
1630      }
1631      rowLock4.release();
1632    } finally {
1633      HBaseTestingUtility.closeRegionAndWAL(this.region);
1634      this.region = null;
1635    }
1636  }
1637
1638  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1639      throws InterruptedException {
1640    long startWait = System.currentTimeMillis();
1641    long currentCount;
1642    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1643      Thread.sleep(100);
1644      if (System.currentTimeMillis() - startWait > 10000) {
1645        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1646            expectedCount, currentCount));
1647      }
1648    }
1649  }
1650
1651  @Test
1652  public void testAtomicBatchPut() throws IOException {
1653    final Put[] puts = new Put[10];
1654    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1655    try {
1656      long syncs = prepareRegionForBachPut(puts, source, false);
1657
1658      // 1. Straight forward case, should succeed
1659      MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1660          HConstants.NO_NONCE, HConstants.NO_NONCE);
1661      OperationStatus[] codes = this.region.batchMutate(batchOp);
1662      assertEquals(10, codes.length);
1663      for (int i = 0; i < 10; i++) {
1664        assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1665      }
1666      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1667
1668      // 2. Failed to get lock
1669      RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1670      // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1671      // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1672      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1673      final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1674      final CountDownLatch finishedPuts = new CountDownLatch(1);
1675      final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1676          HConstants
1677          .NO_NONCE,
1678          HConstants.NO_NONCE);
1679      TestThread putter = new TestThread(ctx) {
1680        @Override
1681        public void doWork() throws IOException {
1682          try {
1683            region.batchMutate(finalBatchOp);
1684          } catch (IOException ioe) {
1685            LOG.error("test failed!", ioe);
1686            retFromThread.set(ioe);
1687          }
1688          finishedPuts.countDown();
1689        }
1690      };
1691      LOG.info("...starting put thread while holding locks");
1692      ctx.addThread(putter);
1693      ctx.startThreads();
1694      LOG.info("...waiting for batch puts while holding locks");
1695      try {
1696        finishedPuts.await();
1697      } catch (InterruptedException e) {
1698        LOG.error("Interrupted!", e);
1699      } finally {
1700        if (lock != null) {
1701          lock.release();
1702        }
1703      }
1704      assertNotNull(retFromThread.get());
1705      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1706
1707      // 3. Exception thrown in validation
1708      LOG.info("Next a batch put with one invalid family");
1709      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1710      batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1711          HConstants.NO_NONCE);
1712      thrown.expect(NoSuchColumnFamilyException.class);
1713      this.region.batchMutate(batchOp);
1714    } finally {
1715      HBaseTestingUtility.closeRegionAndWAL(this.region);
1716      this.region = null;
1717    }
1718  }
1719
1720  @Test
1721  public void testBatchPutWithTsSlop() throws Exception {
1722    // add data with a timestamp that is too recent for range. Ensure assert
1723    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1724    final Put[] puts = new Put[10];
1725    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1726
1727    try {
1728      long syncs = prepareRegionForBachPut(puts, source, true);
1729
1730      OperationStatus[] codes = this.region.batchMutate(puts);
1731      assertEquals(10, codes.length);
1732      for (int i = 0; i < 10; i++) {
1733        assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1734      }
1735      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1736    } finally {
1737      HBaseTestingUtility.closeRegionAndWAL(this.region);
1738      this.region = null;
1739    }
1740  }
1741
1742  /**
1743   * @return syncs initial syncTimeNumOps
1744   */
1745  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1746      boolean slop) throws IOException {
1747    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1748
1749    LOG.info("First a batch put with all valid puts");
1750    for (int i = 0; i < puts.length; i++) {
1751      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1752          new Put(Bytes.toBytes("row_" + i));
1753      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1754    }
1755
1756    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1757    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1758    return syncs;
1759  }
1760
1761  // ////////////////////////////////////////////////////////////////////////////
1762  // checkAndMutate tests
1763  // ////////////////////////////////////////////////////////////////////////////
1764  @Test
1765  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1766    byte[] row1 = Bytes.toBytes("row1");
1767    byte[] fam1 = Bytes.toBytes("fam1");
1768    byte[] qf1 = Bytes.toBytes("qualifier");
1769    byte[] emptyVal = new byte[] {};
1770    byte[] val1 = Bytes.toBytes("value1");
1771    byte[] val2 = Bytes.toBytes("value2");
1772
1773    // Setting up region
1774    this.region = initHRegion(tableName, method, CONF, fam1);
1775    try {
1776      // Putting empty data in key
1777      Put put = new Put(row1);
1778      put.addColumn(fam1, qf1, emptyVal);
1779
1780      // checkAndPut with empty value
1781      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1782          emptyVal), put);
1783      assertTrue(res);
1784
1785      // Putting data in key
1786      put = new Put(row1);
1787      put.addColumn(fam1, qf1, val1);
1788
1789      // checkAndPut with correct value
1790      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1791          put);
1792      assertTrue(res);
1793
1794      // not empty anymore
1795      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1796          put);
1797      assertFalse(res);
1798
1799      Delete delete = new Delete(row1);
1800      delete.addColumn(fam1, qf1);
1801      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1802          delete);
1803      assertFalse(res);
1804
1805      put = new Put(row1);
1806      put.addColumn(fam1, qf1, val2);
1807      // checkAndPut with correct value
1808      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1809          put);
1810      assertTrue(res);
1811
1812      // checkAndDelete with correct value
1813      delete = new Delete(row1);
1814      delete.addColumn(fam1, qf1);
1815      delete.addColumn(fam1, qf1);
1816      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
1817          delete);
1818      assertTrue(res);
1819
1820      delete = new Delete(row1);
1821      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1822          delete);
1823      assertTrue(res);
1824
1825      // checkAndPut looking for a null value
1826      put = new Put(row1);
1827      put.addColumn(fam1, qf1, val1);
1828
1829      res = region
1830          .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1831      assertTrue(res);
1832    } finally {
1833      HBaseTestingUtility.closeRegionAndWAL(this.region);
1834      this.region = null;
1835    }
1836  }
1837
1838  @Test
1839  public void testCheckAndMutate_WithWrongValue() throws IOException {
1840    byte[] row1 = Bytes.toBytes("row1");
1841    byte[] fam1 = Bytes.toBytes("fam1");
1842    byte[] qf1 = Bytes.toBytes("qualifier");
1843    byte[] val1 = Bytes.toBytes("value1");
1844    byte[] val2 = Bytes.toBytes("value2");
1845    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1846    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1847
1848    // Setting up region
1849    this.region = initHRegion(tableName, method, CONF, fam1);
1850    try {
1851      // Putting data in key
1852      Put put = new Put(row1);
1853      put.addColumn(fam1, qf1, val1);
1854      region.put(put);
1855
1856      // checkAndPut with wrong value
1857      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1858          val2), put);
1859      assertEquals(false, res);
1860
1861      // checkAndDelete with wrong value
1862      Delete delete = new Delete(row1);
1863      delete.addFamily(fam1);
1864      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
1865          put);
1866      assertEquals(false, res);
1867
1868      // Putting data in key
1869      put = new Put(row1);
1870      put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1871      region.put(put);
1872
1873      // checkAndPut with wrong value
1874      res =
1875          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1876              bd2), put);
1877      assertEquals(false, res);
1878
1879      // checkAndDelete with wrong value
1880      delete = new Delete(row1);
1881      delete.addFamily(fam1);
1882      res =
1883          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1884              bd2), put);
1885      assertEquals(false, res);
1886    } finally {
1887      HBaseTestingUtility.closeRegionAndWAL(this.region);
1888      this.region = null;
1889    }
1890  }
1891
1892  @Test
1893  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1894    byte[] row1 = Bytes.toBytes("row1");
1895    byte[] fam1 = Bytes.toBytes("fam1");
1896    byte[] qf1 = Bytes.toBytes("qualifier");
1897    byte[] val1 = Bytes.toBytes("value1");
1898    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1899
1900    // Setting up region
1901    this.region = initHRegion(tableName, method, CONF, fam1);
1902    try {
1903      // Putting data in key
1904      Put put = new Put(row1);
1905      put.addColumn(fam1, qf1, val1);
1906      region.put(put);
1907
1908      // checkAndPut with correct value
1909      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1910          val1), put);
1911      assertEquals(true, res);
1912
1913      // checkAndDelete with correct value
1914      Delete delete = new Delete(row1);
1915      delete.addColumn(fam1, qf1);
1916      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1917          delete);
1918      assertEquals(true, res);
1919
1920      // Putting data in key
1921      put = new Put(row1);
1922      put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1923      region.put(put);
1924
1925      // checkAndPut with correct value
1926      res =
1927          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1928              bd1), put);
1929      assertEquals(true, res);
1930
1931      // checkAndDelete with correct value
1932      delete = new Delete(row1);
1933      delete.addColumn(fam1, qf1);
1934      res =
1935          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1936              bd1), delete);
1937      assertEquals(true, res);
1938    } finally {
1939      HBaseTestingUtility.closeRegionAndWAL(this.region);
1940      this.region = null;
1941    }
1942  }
1943
1944  @Test
1945  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1946    byte[] row1 = Bytes.toBytes("row1");
1947    byte[] fam1 = Bytes.toBytes("fam1");
1948    byte[] qf1 = Bytes.toBytes("qualifier");
1949    byte[] val1 = Bytes.toBytes("value1");
1950    byte[] val2 = Bytes.toBytes("value2");
1951    byte[] val3 = Bytes.toBytes("value3");
1952    byte[] val4 = Bytes.toBytes("value4");
1953
1954    // Setting up region
1955    this.region = initHRegion(tableName, method, CONF, fam1);
1956    try {
1957      // Putting val3 in key
1958      Put put = new Put(row1);
1959      put.addColumn(fam1, qf1, val3);
1960      region.put(put);
1961
1962      // Test CompareOp.LESS: original = val3, compare with val3, fail
1963      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1964          new BinaryComparator(val3), put);
1965      assertEquals(false, res);
1966
1967      // Test CompareOp.LESS: original = val3, compare with val4, fail
1968      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1969          new BinaryComparator(val4), put);
1970      assertEquals(false, res);
1971
1972      // Test CompareOp.LESS: original = val3, compare with val2,
1973      // succeed (now value = val2)
1974      put = new Put(row1);
1975      put.addColumn(fam1, qf1, val2);
1976      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1977          new BinaryComparator(val2), put);
1978      assertEquals(true, res);
1979
1980      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1981      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1982          new BinaryComparator(val3), put);
1983      assertEquals(false, res);
1984
1985      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1986      // succeed (value still = val2)
1987      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1988          new BinaryComparator(val2), put);
1989      assertEquals(true, res);
1990
1991      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1992      // succeed (now value = val3)
1993      put = new Put(row1);
1994      put.addColumn(fam1, qf1, val3);
1995      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1996          new BinaryComparator(val1), put);
1997      assertEquals(true, res);
1998
1999      // Test CompareOp.GREATER: original = val3, compare with val3, fail
2000      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2001          new BinaryComparator(val3), put);
2002      assertEquals(false, res);
2003
2004      // Test CompareOp.GREATER: original = val3, compare with val2, fail
2005      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2006          new BinaryComparator(val2), put);
2007      assertEquals(false, res);
2008
2009      // Test CompareOp.GREATER: original = val3, compare with val4,
2010      // succeed (now value = val2)
2011      put = new Put(row1);
2012      put.addColumn(fam1, qf1, val2);
2013      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2014          new BinaryComparator(val4), put);
2015      assertEquals(true, res);
2016
2017      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2018      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2019          new BinaryComparator(val1), put);
2020      assertEquals(false, res);
2021
2022      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2023      // succeed (value still = val2)
2024      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2025          new BinaryComparator(val2), put);
2026      assertEquals(true, res);
2027
2028      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2029      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2030          new BinaryComparator(val3), put);
2031      assertEquals(true, res);
2032    } finally {
2033      HBaseTestingUtility.closeRegionAndWAL(this.region);
2034      this.region = null;
2035    }
2036  }
2037
2038  @Test
2039  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2040    byte[] row1 = Bytes.toBytes("row1");
2041    byte[] fam1 = Bytes.toBytes("fam1");
2042    byte[] fam2 = Bytes.toBytes("fam2");
2043    byte[] qf1 = Bytes.toBytes("qualifier");
2044    byte[] val1 = Bytes.toBytes("value1");
2045    byte[] val2 = Bytes.toBytes("value2");
2046
2047    byte[][] families = { fam1, fam2 };
2048
2049    // Setting up region
2050    this.region = initHRegion(tableName, method, CONF, families);
2051    try {
2052      // Putting data in the key to check
2053      Put put = new Put(row1);
2054      put.addColumn(fam1, qf1, val1);
2055      region.put(put);
2056
2057      // Creating put to add
2058      long ts = System.currentTimeMillis();
2059      KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2060      put = new Put(row1);
2061      put.add(kv);
2062
2063      // checkAndPut with wrong value
2064      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
2065          val1), put);
2066      assertEquals(true, res);
2067
2068      Get get = new Get(row1);
2069      get.addColumn(fam2, qf1);
2070      Cell[] actual = region.get(get).rawCells();
2071
2072      Cell[] expected = { kv };
2073
2074      assertEquals(expected.length, actual.length);
2075      for (int i = 0; i < actual.length; i++) {
2076        assertEquals(expected[i], actual[i]);
2077      }
2078    } finally {
2079      HBaseTestingUtility.closeRegionAndWAL(this.region);
2080      this.region = null;
2081    }
2082  }
2083
2084  @Test
2085  public void testCheckAndPut_wrongRowInPut() throws IOException {
2086    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2087    try {
2088      Put put = new Put(row2);
2089      put.addColumn(fam1, qual1, value1);
2090      try {
2091        region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2092            new BinaryComparator(value2), put);
2093        fail();
2094      } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2095        // expected exception.
2096      }
2097    } finally {
2098      HBaseTestingUtility.closeRegionAndWAL(this.region);
2099      this.region = null;
2100    }
2101  }
2102
2103  @Test
2104  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2105    byte[] row1 = Bytes.toBytes("row1");
2106    byte[] fam1 = Bytes.toBytes("fam1");
2107    byte[] fam2 = Bytes.toBytes("fam2");
2108    byte[] qf1 = Bytes.toBytes("qualifier1");
2109    byte[] qf2 = Bytes.toBytes("qualifier2");
2110    byte[] qf3 = Bytes.toBytes("qualifier3");
2111    byte[] val1 = Bytes.toBytes("value1");
2112    byte[] val2 = Bytes.toBytes("value2");
2113    byte[] val3 = Bytes.toBytes("value3");
2114    byte[] emptyVal = new byte[] {};
2115
2116    byte[][] families = { fam1, fam2 };
2117
2118    // Setting up region
2119    this.region = initHRegion(tableName, method, CONF, families);
2120    try {
2121      // Put content
2122      Put put = new Put(row1);
2123      put.addColumn(fam1, qf1, val1);
2124      region.put(put);
2125      Threads.sleep(2);
2126
2127      put = new Put(row1);
2128      put.addColumn(fam1, qf1, val2);
2129      put.addColumn(fam2, qf1, val3);
2130      put.addColumn(fam2, qf2, val2);
2131      put.addColumn(fam2, qf3, val1);
2132      put.addColumn(fam1, qf3, val1);
2133      region.put(put);
2134
2135      // Multi-column delete
2136      Delete delete = new Delete(row1);
2137      delete.addColumn(fam1, qf1);
2138      delete.addColumn(fam2, qf1);
2139      delete.addColumn(fam1, qf3);
2140      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
2141          val2), delete);
2142      assertEquals(true, res);
2143
2144      Get get = new Get(row1);
2145      get.addColumn(fam1, qf1);
2146      get.addColumn(fam1, qf3);
2147      get.addColumn(fam2, qf2);
2148      Result r = region.get(get);
2149      assertEquals(2, r.size());
2150      assertArrayEquals(val1, r.getValue(fam1, qf1));
2151      assertArrayEquals(val2, r.getValue(fam2, qf2));
2152
2153      // Family delete
2154      delete = new Delete(row1);
2155      delete.addFamily(fam2);
2156      res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
2157          delete);
2158      assertEquals(true, res);
2159
2160      get = new Get(row1);
2161      r = region.get(get);
2162      assertEquals(1, r.size());
2163      assertArrayEquals(val1, r.getValue(fam1, qf1));
2164
2165      // Row delete
2166      delete = new Delete(row1);
2167      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2168          delete);
2169      assertEquals(true, res);
2170      get = new Get(row1);
2171      r = region.get(get);
2172      assertEquals(0, r.size());
2173    } finally {
2174      HBaseTestingUtility.closeRegionAndWAL(this.region);
2175      this.region = null;
2176    }
2177  }
2178
2179  // ////////////////////////////////////////////////////////////////////////////
2180  // Delete tests
2181  // ////////////////////////////////////////////////////////////////////////////
2182  @Test
2183  public void testDelete_multiDeleteColumn() throws IOException {
2184    byte[] row1 = Bytes.toBytes("row1");
2185    byte[] fam1 = Bytes.toBytes("fam1");
2186    byte[] qual = Bytes.toBytes("qualifier");
2187    byte[] value = Bytes.toBytes("value");
2188
2189    Put put = new Put(row1);
2190    put.addColumn(fam1, qual, 1, value);
2191    put.addColumn(fam1, qual, 2, value);
2192
2193    this.region = initHRegion(tableName, method, CONF, fam1);
2194    try {
2195      region.put(put);
2196
2197      // We do support deleting more than 1 'latest' version
2198      Delete delete = new Delete(row1);
2199      delete.addColumn(fam1, qual);
2200      delete.addColumn(fam1, qual);
2201      region.delete(delete);
2202
2203      Get get = new Get(row1);
2204      get.addFamily(fam1);
2205      Result r = region.get(get);
2206      assertEquals(0, r.size());
2207    } finally {
2208      HBaseTestingUtility.closeRegionAndWAL(this.region);
2209      this.region = null;
2210    }
2211  }
2212
2213  @Test
2214  public void testDelete_CheckFamily() throws IOException {
2215    byte[] row1 = Bytes.toBytes("row1");
2216    byte[] fam1 = Bytes.toBytes("fam1");
2217    byte[] fam2 = Bytes.toBytes("fam2");
2218    byte[] fam3 = Bytes.toBytes("fam3");
2219    byte[] fam4 = Bytes.toBytes("fam4");
2220
2221    // Setting up region
2222    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2223    try {
2224      List<Cell> kvs = new ArrayList<>();
2225      kvs.add(new KeyValue(row1, fam4, null, null));
2226
2227      // testing existing family
2228      byte[] family = fam2;
2229      try {
2230        NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2231        deleteMap.put(family, kvs);
2232        region.delete(deleteMap, Durability.SYNC_WAL);
2233      } catch (Exception e) {
2234        fail("Family " + new String(family, StandardCharsets.UTF_8) + " does not exist");
2235      }
2236
2237      // testing non existing family
2238      boolean ok = false;
2239      family = fam4;
2240      try {
2241        NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2242        deleteMap.put(family, kvs);
2243        region.delete(deleteMap, Durability.SYNC_WAL);
2244      } catch (Exception e) {
2245        ok = true;
2246      }
2247      assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2248    } finally {
2249      HBaseTestingUtility.closeRegionAndWAL(this.region);
2250      this.region = null;
2251    }
2252  }
2253
2254  @Test
2255  public void testDelete_mixed() throws IOException, InterruptedException {
2256    byte[] fam = Bytes.toBytes("info");
2257    byte[][] families = { fam };
2258    this.region = initHRegion(tableName, method, CONF, families);
2259    try {
2260      EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2261
2262      byte[] row = Bytes.toBytes("table_name");
2263      // column names
2264      byte[] serverinfo = Bytes.toBytes("serverinfo");
2265      byte[] splitA = Bytes.toBytes("splitA");
2266      byte[] splitB = Bytes.toBytes("splitB");
2267
2268      // add some data:
2269      Put put = new Put(row);
2270      put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2271      region.put(put);
2272
2273      put = new Put(row);
2274      put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2275      region.put(put);
2276
2277      put = new Put(row);
2278      put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2279      region.put(put);
2280
2281      // ok now delete a split:
2282      Delete delete = new Delete(row);
2283      delete.addColumns(fam, splitA);
2284      region.delete(delete);
2285
2286      // assert some things:
2287      Get get = new Get(row).addColumn(fam, serverinfo);
2288      Result result = region.get(get);
2289      assertEquals(1, result.size());
2290
2291      get = new Get(row).addColumn(fam, splitA);
2292      result = region.get(get);
2293      assertEquals(0, result.size());
2294
2295      get = new Get(row).addColumn(fam, splitB);
2296      result = region.get(get);
2297      assertEquals(1, result.size());
2298
2299      // Assert that after a delete, I can put.
2300      put = new Put(row);
2301      put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2302      region.put(put);
2303      get = new Get(row);
2304      result = region.get(get);
2305      assertEquals(3, result.size());
2306
2307      // Now delete all... then test I can add stuff back
2308      delete = new Delete(row);
2309      region.delete(delete);
2310      assertEquals(0, region.get(get).size());
2311
2312      region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2313      result = region.get(get);
2314      assertEquals(1, result.size());
2315    } finally {
2316      HBaseTestingUtility.closeRegionAndWAL(this.region);
2317      this.region = null;
2318    }
2319  }
2320
2321  @Test
2322  public void testDeleteRowWithFutureTs() throws IOException {
2323    byte[] fam = Bytes.toBytes("info");
2324    byte[][] families = { fam };
2325    this.region = initHRegion(tableName, method, CONF, families);
2326    try {
2327      byte[] row = Bytes.toBytes("table_name");
2328      // column names
2329      byte[] serverinfo = Bytes.toBytes("serverinfo");
2330
2331      // add data in the far future
2332      Put put = new Put(row);
2333      put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2334      region.put(put);
2335
2336      // now delete something in the present
2337      Delete delete = new Delete(row);
2338      region.delete(delete);
2339
2340      // make sure we still see our data
2341      Get get = new Get(row).addColumn(fam, serverinfo);
2342      Result result = region.get(get);
2343      assertEquals(1, result.size());
2344
2345      // delete the future row
2346      delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2347      region.delete(delete);
2348
2349      // make sure it is gone
2350      get = new Get(row).addColumn(fam, serverinfo);
2351      result = region.get(get);
2352      assertEquals(0, result.size());
2353    } finally {
2354      HBaseTestingUtility.closeRegionAndWAL(this.region);
2355      this.region = null;
2356    }
2357  }
2358
2359  /**
2360   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2361   * the actual timestamp
2362   */
2363  @Test
2364  public void testPutWithLatestTS() throws IOException {
2365    byte[] fam = Bytes.toBytes("info");
2366    byte[][] families = { fam };
2367    this.region = initHRegion(tableName, method, CONF, families);
2368    try {
2369      byte[] row = Bytes.toBytes("row1");
2370      // column names
2371      byte[] qual = Bytes.toBytes("qual");
2372
2373      // add data with LATEST_TIMESTAMP, put without WAL
2374      Put put = new Put(row);
2375      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2376      region.put(put);
2377
2378      // Make sure it shows up with an actual timestamp
2379      Get get = new Get(row).addColumn(fam, qual);
2380      Result result = region.get(get);
2381      assertEquals(1, result.size());
2382      Cell kv = result.rawCells()[0];
2383      LOG.info("Got: " + kv);
2384      assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2385          kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2386
2387      // Check same with WAL enabled (historically these took different
2388      // code paths, so check both)
2389      row = Bytes.toBytes("row2");
2390      put = new Put(row);
2391      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2392      region.put(put);
2393
2394      // Make sure it shows up with an actual timestamp
2395      get = new Get(row).addColumn(fam, qual);
2396      result = region.get(get);
2397      assertEquals(1, result.size());
2398      kv = result.rawCells()[0];
2399      LOG.info("Got: " + kv);
2400      assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2401          kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2402    } finally {
2403      HBaseTestingUtility.closeRegionAndWAL(this.region);
2404      this.region = null;
2405    }
2406
2407  }
2408
2409  /**
2410   * Tests that there is server-side filtering for invalid timestamp upper
2411   * bound. Note that the timestamp lower bound is automatically handled for us
2412   * by the TTL field.
2413   */
2414  @Test
2415  public void testPutWithTsSlop() throws IOException {
2416    byte[] fam = Bytes.toBytes("info");
2417    byte[][] families = { fam };
2418
2419    // add data with a timestamp that is too recent for range. Ensure assert
2420    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2421    this.region = initHRegion(tableName, method, CONF, families);
2422    boolean caughtExcep = false;
2423    try {
2424      try {
2425        // no TS specified == use latest. should not error
2426        region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2427        // TS out of range. should error
2428        region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2429            System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2430        fail("Expected IOE for TS out of configured timerange");
2431      } catch (FailedSanityCheckException ioe) {
2432        LOG.debug("Received expected exception", ioe);
2433        caughtExcep = true;
2434      }
2435      assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2436    } finally {
2437      HBaseTestingUtility.closeRegionAndWAL(this.region);
2438      this.region = null;
2439    }
2440  }
2441
2442  @Test
2443  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2444    byte[] fam1 = Bytes.toBytes("columnA");
2445    byte[] fam2 = Bytes.toBytes("columnB");
2446    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2447    try {
2448      byte[] rowA = Bytes.toBytes("rowA");
2449      byte[] rowB = Bytes.toBytes("rowB");
2450
2451      byte[] value = Bytes.toBytes("value");
2452
2453      Delete delete = new Delete(rowA);
2454      delete.addFamily(fam1);
2455
2456      region.delete(delete);
2457
2458      // now create data.
2459      Put put = new Put(rowA);
2460      put.addColumn(fam2, null, value);
2461      region.put(put);
2462
2463      put = new Put(rowB);
2464      put.addColumn(fam1, null, value);
2465      put.addColumn(fam2, null, value);
2466      region.put(put);
2467
2468      Scan scan = new Scan();
2469      scan.addFamily(fam1).addFamily(fam2);
2470      InternalScanner s = region.getScanner(scan);
2471      List<Cell> results = new ArrayList<>();
2472      s.next(results);
2473      assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2474
2475      results.clear();
2476      s.next(results);
2477      assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2478    } finally {
2479      HBaseTestingUtility.closeRegionAndWAL(this.region);
2480      this.region = null;
2481    }
2482  }
2483
2484  @Test
2485  public void testDataInMemoryWithoutWAL() throws IOException {
2486    FileSystem fs = FileSystem.get(CONF);
2487    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2488    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2489    // This chunk creation is done throughout the code base. Do we want to move it into core?
2490    // It is missing from this test. W/o it we NPE.
2491    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2492    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2493        COLUMN_FAMILY_BYTES);
2494
2495    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2496      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2497    final long originalSize = KeyValueUtil.length(originalCell);
2498
2499    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2500      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2501    final long addSize = KeyValueUtil.length(addCell);
2502
2503    LOG.info("originalSize:" + originalSize
2504      + ", addSize:" + addSize);
2505    // start test. We expect that the addPut's durability will be replaced
2506    // by originalPut's durability.
2507
2508    // case 1:
2509    testDataInMemoryWithoutWAL(region,
2510            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2511            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2512            originalSize + addSize);
2513
2514    // case 2:
2515    testDataInMemoryWithoutWAL(region,
2516            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2517            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2518            originalSize + addSize);
2519
2520    // case 3:
2521    testDataInMemoryWithoutWAL(region,
2522            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2523            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2524            0);
2525
2526    // case 4:
2527    testDataInMemoryWithoutWAL(region,
2528            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2529            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2530            0);
2531  }
2532
2533  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2534          final Put addPut, long delta) throws IOException {
2535    final long initSize = region.getDataInMemoryWithoutWAL();
2536    // save normalCPHost and replaced by mockedCPHost
2537    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2538    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2539    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2540    // do below format (from Mockito doc).
2541    Mockito.doAnswer(new Answer() {
2542      @Override
2543      public Object answer(InvocationOnMock invocation) throws Throwable {
2544        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2545        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2546        return null;
2547      }
2548    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2549    region.setCoprocessorHost(mockedCPHost);
2550    region.put(originalPut);
2551    region.setCoprocessorHost(normalCPHost);
2552    final long finalSize = region.getDataInMemoryWithoutWAL();
2553    assertEquals("finalSize:" + finalSize + ", initSize:"
2554      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2555  }
2556
2557  @Test
2558  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2559    Delete delete = new Delete(row);
2560    delete.addColumns(fam1, qual1);
2561    doTestDelete_AndPostInsert(delete);
2562  }
2563
2564  @Test
2565  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2566    Delete delete = new Delete(row);
2567    delete.addFamily(fam1);
2568    doTestDelete_AndPostInsert(delete);
2569  }
2570
2571  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2572    this.region = initHRegion(tableName, method, CONF, fam1);
2573    try {
2574      EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2575      Put put = new Put(row);
2576      put.addColumn(fam1, qual1, value1);
2577      region.put(put);
2578
2579      // now delete the value:
2580      region.delete(delete);
2581
2582      // ok put data:
2583      put = new Put(row);
2584      put.addColumn(fam1, qual1, value2);
2585      region.put(put);
2586
2587      // ok get:
2588      Get get = new Get(row);
2589      get.addColumn(fam1, qual1);
2590
2591      Result r = region.get(get);
2592      assertEquals(1, r.size());
2593      assertArrayEquals(value2, r.getValue(fam1, qual1));
2594
2595      // next:
2596      Scan scan = new Scan(row);
2597      scan.addColumn(fam1, qual1);
2598      InternalScanner s = region.getScanner(scan);
2599
2600      List<Cell> results = new ArrayList<>();
2601      assertEquals(false, s.next(results));
2602      assertEquals(1, results.size());
2603      Cell kv = results.get(0);
2604
2605      assertArrayEquals(value2, CellUtil.cloneValue(kv));
2606      assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2607      assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2608      assertArrayEquals(row, CellUtil.cloneRow(kv));
2609    } finally {
2610      HBaseTestingUtility.closeRegionAndWAL(this.region);
2611      this.region = null;
2612    }
2613  }
2614
2615  @Test
2616  public void testDelete_CheckTimestampUpdated() throws IOException {
2617    byte[] row1 = Bytes.toBytes("row1");
2618    byte[] col1 = Bytes.toBytes("col1");
2619    byte[] col2 = Bytes.toBytes("col2");
2620    byte[] col3 = Bytes.toBytes("col3");
2621
2622    // Setting up region
2623    this.region = initHRegion(tableName, method, CONF, fam1);
2624    try {
2625      // Building checkerList
2626      List<Cell> kvs = new ArrayList<>();
2627      kvs.add(new KeyValue(row1, fam1, col1, null));
2628      kvs.add(new KeyValue(row1, fam1, col2, null));
2629      kvs.add(new KeyValue(row1, fam1, col3, null));
2630
2631      NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2632      deleteMap.put(fam1, kvs);
2633      region.delete(deleteMap, Durability.SYNC_WAL);
2634
2635      // extract the key values out the memstore:
2636      // This is kinda hacky, but better than nothing...
2637      long now = System.currentTimeMillis();
2638      AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2639      Cell firstCell = memstore.getActive().first();
2640      assertTrue(firstCell.getTimestamp() <= now);
2641      now = firstCell.getTimestamp();
2642      for (Cell cell : memstore.getActive().getCellSet()) {
2643        assertTrue(cell.getTimestamp() <= now);
2644        now = cell.getTimestamp();
2645      }
2646    } finally {
2647      HBaseTestingUtility.closeRegionAndWAL(this.region);
2648      this.region = null;
2649    }
2650  }
2651
2652  // ////////////////////////////////////////////////////////////////////////////
2653  // Get tests
2654  // ////////////////////////////////////////////////////////////////////////////
2655  @Test
2656  public void testGet_FamilyChecker() throws IOException {
2657    byte[] row1 = Bytes.toBytes("row1");
2658    byte[] fam1 = Bytes.toBytes("fam1");
2659    byte[] fam2 = Bytes.toBytes("False");
2660    byte[] col1 = Bytes.toBytes("col1");
2661
2662    // Setting up region
2663    this.region = initHRegion(tableName, method, CONF, fam1);
2664    try {
2665      Get get = new Get(row1);
2666      get.addColumn(fam2, col1);
2667
2668      // Test
2669      try {
2670        region.get(get);
2671      } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2672        assertFalse(false);
2673        return;
2674      }
2675      assertFalse(true);
2676    } finally {
2677      HBaseTestingUtility.closeRegionAndWAL(this.region);
2678      this.region = null;
2679    }
2680  }
2681
2682  @Test
2683  public void testGet_Basic() throws IOException {
2684    byte[] row1 = Bytes.toBytes("row1");
2685    byte[] fam1 = Bytes.toBytes("fam1");
2686    byte[] col1 = Bytes.toBytes("col1");
2687    byte[] col2 = Bytes.toBytes("col2");
2688    byte[] col3 = Bytes.toBytes("col3");
2689    byte[] col4 = Bytes.toBytes("col4");
2690    byte[] col5 = Bytes.toBytes("col5");
2691
2692    // Setting up region
2693    this.region = initHRegion(tableName, method, CONF, fam1);
2694    try {
2695      // Add to memstore
2696      Put put = new Put(row1);
2697      put.addColumn(fam1, col1, null);
2698      put.addColumn(fam1, col2, null);
2699      put.addColumn(fam1, col3, null);
2700      put.addColumn(fam1, col4, null);
2701      put.addColumn(fam1, col5, null);
2702      region.put(put);
2703
2704      Get get = new Get(row1);
2705      get.addColumn(fam1, col2);
2706      get.addColumn(fam1, col4);
2707      // Expected result
2708      KeyValue kv1 = new KeyValue(row1, fam1, col2);
2709      KeyValue kv2 = new KeyValue(row1, fam1, col4);
2710      KeyValue[] expected = { kv1, kv2 };
2711
2712      // Test
2713      Result res = region.get(get);
2714      assertEquals(expected.length, res.size());
2715      for (int i = 0; i < res.size(); i++) {
2716        assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2717        assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2718        assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2719      }
2720
2721      // Test using a filter on a Get
2722      Get g = new Get(row1);
2723      final int count = 2;
2724      g.setFilter(new ColumnCountGetFilter(count));
2725      res = region.get(g);
2726      assertEquals(count, res.size());
2727    } finally {
2728      HBaseTestingUtility.closeRegionAndWAL(this.region);
2729      this.region = null;
2730    }
2731  }
2732
2733  @Test
2734  public void testGet_Empty() throws IOException {
2735    byte[] row = Bytes.toBytes("row");
2736    byte[] fam = Bytes.toBytes("fam");
2737
2738    this.region = initHRegion(tableName, method, CONF, fam);
2739    try {
2740      Get get = new Get(row);
2741      get.addFamily(fam);
2742      Result r = region.get(get);
2743
2744      assertTrue(r.isEmpty());
2745    } finally {
2746      HBaseTestingUtility.closeRegionAndWAL(this.region);
2747      this.region = null;
2748    }
2749  }
2750
2751  @Test
2752  public void testGetWithFilter() throws IOException, InterruptedException {
2753    byte[] row1 = Bytes.toBytes("row1");
2754    byte[] fam1 = Bytes.toBytes("fam1");
2755    byte[] col1 = Bytes.toBytes("col1");
2756    byte[] value1 = Bytes.toBytes("value1");
2757    byte[] value2 = Bytes.toBytes("value2");
2758
2759    final int maxVersions = 3;
2760    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2761    hcd.setMaxVersions(maxVersions);
2762    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2763    htd.addFamily(hcd);
2764    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2765    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2766    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2767    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2768    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2769
2770    try {
2771      // Put 4 version to memstore
2772      long ts = 0;
2773      Put put = new Put(row1, ts);
2774      put.addColumn(fam1, col1, value1);
2775      region.put(put);
2776      put = new Put(row1, ts + 1);
2777      put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2778      region.put(put);
2779      put = new Put(row1, ts + 2);
2780      put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2781      region.put(put);
2782      put = new Put(row1, ts + 3);
2783      put.addColumn(fam1, col1, value2);
2784      region.put(put);
2785
2786      Get get = new Get(row1);
2787      get.setMaxVersions();
2788      Result res = region.get(get);
2789      // Get 3 versions, the oldest version has gone from user view
2790      assertEquals(maxVersions, res.size());
2791
2792      get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
2793      res = region.get(get);
2794      // When use value filter, the oldest version should still gone from user view and it
2795      // should only return one key vaule
2796      assertEquals(1, res.size());
2797      assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2798      assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2799
2800      region.flush(true);
2801      region.compact(true);
2802      Thread.sleep(1000);
2803      res = region.get(get);
2804      // After flush and compact, the result should be consistent with previous result
2805      assertEquals(1, res.size());
2806      assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2807    } finally {
2808      HBaseTestingUtility.closeRegionAndWAL(this.region);
2809      this.region = null;
2810    }
2811  }
2812
2813  // ////////////////////////////////////////////////////////////////////////////
2814  // Scanner tests
2815  // ////////////////////////////////////////////////////////////////////////////
2816  @Test
2817  public void testGetScanner_WithOkFamilies() throws IOException {
2818    byte[] fam1 = Bytes.toBytes("fam1");
2819    byte[] fam2 = Bytes.toBytes("fam2");
2820
2821    byte[][] families = { fam1, fam2 };
2822
2823    // Setting up region
2824    this.region = initHRegion(tableName, method, CONF, families);
2825    try {
2826      Scan scan = new Scan();
2827      scan.addFamily(fam1);
2828      scan.addFamily(fam2);
2829      try {
2830        region.getScanner(scan);
2831      } catch (Exception e) {
2832        assertTrue("Families could not be found in Region", false);
2833      }
2834    } finally {
2835      HBaseTestingUtility.closeRegionAndWAL(this.region);
2836      this.region = null;
2837    }
2838  }
2839
2840  @Test
2841  public void testGetScanner_WithNotOkFamilies() throws IOException {
2842    byte[] fam1 = Bytes.toBytes("fam1");
2843    byte[] fam2 = Bytes.toBytes("fam2");
2844
2845    byte[][] families = { fam1 };
2846
2847    // Setting up region
2848    this.region = initHRegion(tableName, method, CONF, families);
2849    try {
2850      Scan scan = new Scan();
2851      scan.addFamily(fam2);
2852      boolean ok = false;
2853      try {
2854        region.getScanner(scan);
2855      } catch (Exception e) {
2856        ok = true;
2857      }
2858      assertTrue("Families could not be found in Region", ok);
2859    } finally {
2860      HBaseTestingUtility.closeRegionAndWAL(this.region);
2861      this.region = null;
2862    }
2863  }
2864
2865  @Test
2866  public void testGetScanner_WithNoFamilies() throws IOException {
2867    byte[] row1 = Bytes.toBytes("row1");
2868    byte[] fam1 = Bytes.toBytes("fam1");
2869    byte[] fam2 = Bytes.toBytes("fam2");
2870    byte[] fam3 = Bytes.toBytes("fam3");
2871    byte[] fam4 = Bytes.toBytes("fam4");
2872
2873    byte[][] families = { fam1, fam2, fam3, fam4 };
2874
2875    // Setting up region
2876    this.region = initHRegion(tableName, method, CONF, families);
2877    try {
2878
2879      // Putting data in Region
2880      Put put = new Put(row1);
2881      put.addColumn(fam1, null, null);
2882      put.addColumn(fam2, null, null);
2883      put.addColumn(fam3, null, null);
2884      put.addColumn(fam4, null, null);
2885      region.put(put);
2886
2887      Scan scan = null;
2888      HRegion.RegionScannerImpl is = null;
2889
2890      // Testing to see how many scanners that is produced by getScanner,
2891      // starting
2892      // with known number, 2 - current = 1
2893      scan = new Scan();
2894      scan.addFamily(fam2);
2895      scan.addFamily(fam4);
2896      is = region.getScanner(scan);
2897      assertEquals(1, is.storeHeap.getHeap().size());
2898
2899      scan = new Scan();
2900      is = region.getScanner(scan);
2901      assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2902    } finally {
2903      HBaseTestingUtility.closeRegionAndWAL(this.region);
2904      this.region = null;
2905    }
2906  }
2907
2908  /**
2909   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2910   *
2911   * @throws IOException
2912   */
2913  @Test
2914  public void testGetScanner_WithRegionClosed() throws IOException {
2915    byte[] fam1 = Bytes.toBytes("fam1");
2916    byte[] fam2 = Bytes.toBytes("fam2");
2917
2918    byte[][] families = { fam1, fam2 };
2919
2920    // Setting up region
2921    try {
2922      this.region = initHRegion(tableName, method, CONF, families);
2923    } catch (IOException e) {
2924      e.printStackTrace();
2925      fail("Got IOException during initHRegion, " + e.getMessage());
2926    }
2927    try {
2928      region.closed.set(true);
2929      try {
2930        region.getScanner(null);
2931        fail("Expected to get an exception during getScanner on a region that is closed");
2932      } catch (NotServingRegionException e) {
2933        // this is the correct exception that is expected
2934      } catch (IOException e) {
2935        fail("Got wrong type of exception - should be a NotServingRegionException, " +
2936            "but was an IOException: "
2937            + e.getMessage());
2938      }
2939    } finally {
2940      HBaseTestingUtility.closeRegionAndWAL(this.region);
2941      this.region = null;
2942    }
2943  }
2944
2945  @Test
2946  public void testRegionScanner_Next() throws IOException {
2947    byte[] row1 = Bytes.toBytes("row1");
2948    byte[] row2 = Bytes.toBytes("row2");
2949    byte[] fam1 = Bytes.toBytes("fam1");
2950    byte[] fam2 = Bytes.toBytes("fam2");
2951    byte[] fam3 = Bytes.toBytes("fam3");
2952    byte[] fam4 = Bytes.toBytes("fam4");
2953
2954    byte[][] families = { fam1, fam2, fam3, fam4 };
2955    long ts = System.currentTimeMillis();
2956
2957    // Setting up region
2958    this.region = initHRegion(tableName, method, CONF, families);
2959    try {
2960      // Putting data in Region
2961      Put put = null;
2962      put = new Put(row1);
2963      put.addColumn(fam1, (byte[]) null, ts, null);
2964      put.addColumn(fam2, (byte[]) null, ts, null);
2965      put.addColumn(fam3, (byte[]) null, ts, null);
2966      put.addColumn(fam4, (byte[]) null, ts, null);
2967      region.put(put);
2968
2969      put = new Put(row2);
2970      put.addColumn(fam1, (byte[]) null, ts, null);
2971      put.addColumn(fam2, (byte[]) null, ts, null);
2972      put.addColumn(fam3, (byte[]) null, ts, null);
2973      put.addColumn(fam4, (byte[]) null, ts, null);
2974      region.put(put);
2975
2976      Scan scan = new Scan();
2977      scan.addFamily(fam2);
2978      scan.addFamily(fam4);
2979      InternalScanner is = region.getScanner(scan);
2980
2981      List<Cell> res = null;
2982
2983      // Result 1
2984      List<Cell> expected1 = new ArrayList<>();
2985      expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2986      expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2987
2988      res = new ArrayList<>();
2989      is.next(res);
2990      for (int i = 0; i < res.size(); i++) {
2991        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2992      }
2993
2994      // Result 2
2995      List<Cell> expected2 = new ArrayList<>();
2996      expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2997      expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2998
2999      res = new ArrayList<>();
3000      is.next(res);
3001      for (int i = 0; i < res.size(); i++) {
3002        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
3003      }
3004    } finally {
3005      HBaseTestingUtility.closeRegionAndWAL(this.region);
3006      this.region = null;
3007    }
3008  }
3009
3010  @Test
3011  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
3012    byte[] row1 = Bytes.toBytes("row1");
3013    byte[] qf1 = Bytes.toBytes("qualifier1");
3014    byte[] qf2 = Bytes.toBytes("qualifier2");
3015    byte[] fam1 = Bytes.toBytes("fam1");
3016    byte[][] families = { fam1 };
3017
3018    long ts1 = System.currentTimeMillis();
3019    long ts2 = ts1 + 1;
3020    long ts3 = ts1 + 2;
3021
3022    // Setting up region
3023    this.region = initHRegion(tableName, method, CONF, families);
3024    try {
3025      // Putting data in Region
3026      Put put = null;
3027      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3028      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3029      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3030
3031      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3032      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3033      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3034
3035      put = new Put(row1);
3036      put.add(kv13);
3037      put.add(kv12);
3038      put.add(kv11);
3039      put.add(kv23);
3040      put.add(kv22);
3041      put.add(kv21);
3042      region.put(put);
3043
3044      // Expected
3045      List<Cell> expected = new ArrayList<>();
3046      expected.add(kv13);
3047      expected.add(kv12);
3048
3049      Scan scan = new Scan(row1);
3050      scan.addColumn(fam1, qf1);
3051      scan.setMaxVersions(MAX_VERSIONS);
3052      List<Cell> actual = new ArrayList<>();
3053      InternalScanner scanner = region.getScanner(scan);
3054
3055      boolean hasNext = scanner.next(actual);
3056      assertEquals(false, hasNext);
3057
3058      // Verify result
3059      for (int i = 0; i < expected.size(); i++) {
3060        assertEquals(expected.get(i), actual.get(i));
3061      }
3062    } finally {
3063      HBaseTestingUtility.closeRegionAndWAL(this.region);
3064      this.region = null;
3065    }
3066  }
3067
3068  @Test
3069  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
3070    byte[] row1 = Bytes.toBytes("row1");
3071    byte[] qf1 = Bytes.toBytes("qualifier1");
3072    byte[] qf2 = Bytes.toBytes("qualifier2");
3073    byte[] fam1 = Bytes.toBytes("fam1");
3074    byte[][] families = { fam1 };
3075
3076    long ts1 = 1; // System.currentTimeMillis();
3077    long ts2 = ts1 + 1;
3078    long ts3 = ts1 + 2;
3079
3080    // Setting up region
3081    this.region = initHRegion(tableName, method, CONF, families);
3082    try {
3083      // Putting data in Region
3084      Put put = null;
3085      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3086      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3087      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3088
3089      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3090      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3091      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3092
3093      put = new Put(row1);
3094      put.add(kv13);
3095      put.add(kv12);
3096      put.add(kv11);
3097      put.add(kv23);
3098      put.add(kv22);
3099      put.add(kv21);
3100      region.put(put);
3101      region.flush(true);
3102
3103      // Expected
3104      List<Cell> expected = new ArrayList<>();
3105      expected.add(kv13);
3106      expected.add(kv12);
3107      expected.add(kv23);
3108      expected.add(kv22);
3109
3110      Scan scan = new Scan(row1);
3111      scan.addColumn(fam1, qf1);
3112      scan.addColumn(fam1, qf2);
3113      scan.setMaxVersions(MAX_VERSIONS);
3114      List<Cell> actual = new ArrayList<>();
3115      InternalScanner scanner = region.getScanner(scan);
3116
3117      boolean hasNext = scanner.next(actual);
3118      assertEquals(false, hasNext);
3119
3120      // Verify result
3121      for (int i = 0; i < expected.size(); i++) {
3122        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3123      }
3124    } finally {
3125      HBaseTestingUtility.closeRegionAndWAL(this.region);
3126      this.region = null;
3127    }
3128  }
3129
3130  @Test
3131  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3132      IOException {
3133    byte[] row1 = Bytes.toBytes("row1");
3134    byte[] fam1 = Bytes.toBytes("fam1");
3135    byte[][] families = { fam1 };
3136    byte[] qf1 = Bytes.toBytes("qualifier1");
3137    byte[] qf2 = Bytes.toBytes("qualifier2");
3138
3139    long ts1 = 1;
3140    long ts2 = ts1 + 1;
3141    long ts3 = ts1 + 2;
3142    long ts4 = ts1 + 3;
3143
3144    // Setting up region
3145    this.region = initHRegion(tableName, method, CONF, families);
3146    try {
3147      // Putting data in Region
3148      KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3149      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3150      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3151      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3152
3153      KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3154      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3155      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3156      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3157
3158      Put put = null;
3159      put = new Put(row1);
3160      put.add(kv14);
3161      put.add(kv24);
3162      region.put(put);
3163      region.flush(true);
3164
3165      put = new Put(row1);
3166      put.add(kv23);
3167      put.add(kv13);
3168      region.put(put);
3169      region.flush(true);
3170
3171      put = new Put(row1);
3172      put.add(kv22);
3173      put.add(kv12);
3174      region.put(put);
3175      region.flush(true);
3176
3177      put = new Put(row1);
3178      put.add(kv21);
3179      put.add(kv11);
3180      region.put(put);
3181
3182      // Expected
3183      List<Cell> expected = new ArrayList<>();
3184      expected.add(kv14);
3185      expected.add(kv13);
3186      expected.add(kv12);
3187      expected.add(kv24);
3188      expected.add(kv23);
3189      expected.add(kv22);
3190
3191      Scan scan = new Scan(row1);
3192      scan.addColumn(fam1, qf1);
3193      scan.addColumn(fam1, qf2);
3194      int versions = 3;
3195      scan.setMaxVersions(versions);
3196      List<Cell> actual = new ArrayList<>();
3197      InternalScanner scanner = region.getScanner(scan);
3198
3199      boolean hasNext = scanner.next(actual);
3200      assertEquals(false, hasNext);
3201
3202      // Verify result
3203      for (int i = 0; i < expected.size(); i++) {
3204        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3205      }
3206    } finally {
3207      HBaseTestingUtility.closeRegionAndWAL(this.region);
3208      this.region = null;
3209    }
3210  }
3211
3212  @Test
3213  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3214    byte[] row1 = Bytes.toBytes("row1");
3215    byte[] qf1 = Bytes.toBytes("qualifier1");
3216    byte[] qf2 = Bytes.toBytes("qualifier2");
3217    byte[] fam1 = Bytes.toBytes("fam1");
3218    byte[][] families = { fam1 };
3219
3220    long ts1 = System.currentTimeMillis();
3221    long ts2 = ts1 + 1;
3222    long ts3 = ts1 + 2;
3223
3224    // Setting up region
3225    this.region = initHRegion(tableName, method, CONF, families);
3226    try {
3227      // Putting data in Region
3228      Put put = null;
3229      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3230      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3231      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3232
3233      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3234      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3235      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3236
3237      put = new Put(row1);
3238      put.add(kv13);
3239      put.add(kv12);
3240      put.add(kv11);
3241      put.add(kv23);
3242      put.add(kv22);
3243      put.add(kv21);
3244      region.put(put);
3245
3246      // Expected
3247      List<Cell> expected = new ArrayList<>();
3248      expected.add(kv13);
3249      expected.add(kv12);
3250      expected.add(kv23);
3251      expected.add(kv22);
3252
3253      Scan scan = new Scan(row1);
3254      scan.addFamily(fam1);
3255      scan.setMaxVersions(MAX_VERSIONS);
3256      List<Cell> actual = new ArrayList<>();
3257      InternalScanner scanner = region.getScanner(scan);
3258
3259      boolean hasNext = scanner.next(actual);
3260      assertEquals(false, hasNext);
3261
3262      // Verify result
3263      for (int i = 0; i < expected.size(); i++) {
3264        assertEquals(expected.get(i), actual.get(i));
3265      }
3266    } finally {
3267      HBaseTestingUtility.closeRegionAndWAL(this.region);
3268      this.region = null;
3269    }
3270  }
3271
3272  @Test
3273  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3274    byte[] row1 = Bytes.toBytes("row1");
3275    byte[] qf1 = Bytes.toBytes("qualifier1");
3276    byte[] qf2 = Bytes.toBytes("qualifier2");
3277    byte[] fam1 = Bytes.toBytes("fam1");
3278
3279    long ts1 = 1; // System.currentTimeMillis();
3280    long ts2 = ts1 + 1;
3281    long ts3 = ts1 + 2;
3282
3283    // Setting up region
3284    this.region = initHRegion(tableName, method, CONF, fam1);
3285    try {
3286      // Putting data in Region
3287      Put put = null;
3288      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3289      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3290      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3291
3292      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3293      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3294      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3295
3296      put = new Put(row1);
3297      put.add(kv13);
3298      put.add(kv12);
3299      put.add(kv11);
3300      put.add(kv23);
3301      put.add(kv22);
3302      put.add(kv21);
3303      region.put(put);
3304      region.flush(true);
3305
3306      // Expected
3307      List<Cell> expected = new ArrayList<>();
3308      expected.add(kv13);
3309      expected.add(kv12);
3310      expected.add(kv23);
3311      expected.add(kv22);
3312
3313      Scan scan = new Scan(row1);
3314      scan.addFamily(fam1);
3315      scan.setMaxVersions(MAX_VERSIONS);
3316      List<Cell> actual = new ArrayList<>();
3317      InternalScanner scanner = region.getScanner(scan);
3318
3319      boolean hasNext = scanner.next(actual);
3320      assertEquals(false, hasNext);
3321
3322      // Verify result
3323      for (int i = 0; i < expected.size(); i++) {
3324        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3325      }
3326    } finally {
3327      HBaseTestingUtility.closeRegionAndWAL(this.region);
3328      this.region = null;
3329    }
3330  }
3331
3332  @Test
3333  public void testScanner_StopRow1542() throws IOException {
3334    byte[] family = Bytes.toBytes("testFamily");
3335    this.region = initHRegion(tableName, method, CONF, family);
3336    try {
3337      byte[] row1 = Bytes.toBytes("row111");
3338      byte[] row2 = Bytes.toBytes("row222");
3339      byte[] row3 = Bytes.toBytes("row333");
3340      byte[] row4 = Bytes.toBytes("row444");
3341      byte[] row5 = Bytes.toBytes("row555");
3342
3343      byte[] col1 = Bytes.toBytes("Pub111");
3344      byte[] col2 = Bytes.toBytes("Pub222");
3345
3346      Put put = new Put(row1);
3347      put.addColumn(family, col1, Bytes.toBytes(10L));
3348      region.put(put);
3349
3350      put = new Put(row2);
3351      put.addColumn(family, col1, Bytes.toBytes(15L));
3352      region.put(put);
3353
3354      put = new Put(row3);
3355      put.addColumn(family, col2, Bytes.toBytes(20L));
3356      region.put(put);
3357
3358      put = new Put(row4);
3359      put.addColumn(family, col2, Bytes.toBytes(30L));
3360      region.put(put);
3361
3362      put = new Put(row5);
3363      put.addColumn(family, col1, Bytes.toBytes(40L));
3364      region.put(put);
3365
3366      Scan scan = new Scan(row3, row4);
3367      scan.setMaxVersions();
3368      scan.addColumn(family, col1);
3369      InternalScanner s = region.getScanner(scan);
3370
3371      List<Cell> results = new ArrayList<>();
3372      assertEquals(false, s.next(results));
3373      assertEquals(0, results.size());
3374    } finally {
3375      HBaseTestingUtility.closeRegionAndWAL(this.region);
3376      this.region = null;
3377    }
3378  }
3379
3380  @Test
3381  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3382    byte[] row1 = Bytes.toBytes("row1");
3383    byte[] fam1 = Bytes.toBytes("fam1");
3384    byte[] qf1 = Bytes.toBytes("qualifier1");
3385    byte[] qf2 = Bytes.toBytes("quateslifier2");
3386
3387    long ts1 = 1;
3388    long ts2 = ts1 + 1;
3389    long ts3 = ts1 + 2;
3390    long ts4 = ts1 + 3;
3391
3392    // Setting up region
3393    this.region = initHRegion(tableName, method, CONF, fam1);
3394    try {
3395      // Putting data in Region
3396      KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3397      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3398      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3399      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3400
3401      KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3402      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3403      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3404      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3405
3406      Put put = null;
3407      put = new Put(row1);
3408      put.add(kv14);
3409      put.add(kv24);
3410      region.put(put);
3411      region.flush(true);
3412
3413      put = new Put(row1);
3414      put.add(kv23);
3415      put.add(kv13);
3416      region.put(put);
3417      region.flush(true);
3418
3419      put = new Put(row1);
3420      put.add(kv22);
3421      put.add(kv12);
3422      region.put(put);
3423      region.flush(true);
3424
3425      put = new Put(row1);
3426      put.add(kv21);
3427      put.add(kv11);
3428      region.put(put);
3429
3430      // Expected
3431      List<KeyValue> expected = new ArrayList<>();
3432      expected.add(kv14);
3433      expected.add(kv13);
3434      expected.add(kv12);
3435      expected.add(kv24);
3436      expected.add(kv23);
3437      expected.add(kv22);
3438
3439      Scan scan = new Scan(row1);
3440      int versions = 3;
3441      scan.setMaxVersions(versions);
3442      List<Cell> actual = new ArrayList<>();
3443      InternalScanner scanner = region.getScanner(scan);
3444
3445      boolean hasNext = scanner.next(actual);
3446      assertEquals(false, hasNext);
3447
3448      // Verify result
3449      for (int i = 0; i < expected.size(); i++) {
3450        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3451      }
3452    } finally {
3453      HBaseTestingUtility.closeRegionAndWAL(this.region);
3454      this.region = null;
3455    }
3456  }
3457
3458  /**
3459   * Added for HBASE-5416
3460   *
3461   * Here we test scan optimization when only subset of CFs are used in filter
3462   * conditions.
3463   */
3464  @Test
3465  public void testScanner_JoinedScanners() throws IOException {
3466    byte[] cf_essential = Bytes.toBytes("essential");
3467    byte[] cf_joined = Bytes.toBytes("joined");
3468    byte[] cf_alpha = Bytes.toBytes("alpha");
3469    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3470    try {
3471      byte[] row1 = Bytes.toBytes("row1");
3472      byte[] row2 = Bytes.toBytes("row2");
3473      byte[] row3 = Bytes.toBytes("row3");
3474
3475      byte[] col_normal = Bytes.toBytes("d");
3476      byte[] col_alpha = Bytes.toBytes("a");
3477
3478      byte[] filtered_val = Bytes.toBytes(3);
3479
3480      Put put = new Put(row1);
3481      put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3482      put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3483      region.put(put);
3484
3485      put = new Put(row2);
3486      put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3487      put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3488      put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3489      region.put(put);
3490
3491      put = new Put(row3);
3492      put.addColumn(cf_essential, col_normal, filtered_val);
3493      put.addColumn(cf_joined, col_normal, filtered_val);
3494      region.put(put);
3495
3496      // Check two things:
3497      // 1. result list contains expected values
3498      // 2. result list is sorted properly
3499
3500      Scan scan = new Scan();
3501      Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3502          CompareOp.NOT_EQUAL, filtered_val);
3503      scan.setFilter(filter);
3504      scan.setLoadColumnFamiliesOnDemand(true);
3505      InternalScanner s = region.getScanner(scan);
3506
3507      List<Cell> results = new ArrayList<>();
3508      assertTrue(s.next(results));
3509      assertEquals(1, results.size());
3510      results.clear();
3511
3512      assertTrue(s.next(results));
3513      assertEquals(3, results.size());
3514      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3515      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3516      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3517      results.clear();
3518
3519      assertFalse(s.next(results));
3520      assertEquals(0, results.size());
3521    } finally {
3522      HBaseTestingUtility.closeRegionAndWAL(this.region);
3523      this.region = null;
3524    }
3525  }
3526
3527  /**
3528   * HBASE-5416
3529   *
3530   * Test case when scan limits amount of KVs returned on each next() call.
3531   */
3532  @Test
3533  public void testScanner_JoinedScannersWithLimits() throws IOException {
3534    final byte[] cf_first = Bytes.toBytes("first");
3535    final byte[] cf_second = Bytes.toBytes("second");
3536
3537    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3538    try {
3539      final byte[] col_a = Bytes.toBytes("a");
3540      final byte[] col_b = Bytes.toBytes("b");
3541
3542      Put put;
3543
3544      for (int i = 0; i < 10; i++) {
3545        put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3546        put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3547        if (i < 5) {
3548          put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3549          put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3550          put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3551        }
3552        region.put(put);
3553      }
3554
3555      Scan scan = new Scan();
3556      scan.setLoadColumnFamiliesOnDemand(true);
3557      Filter bogusFilter = new FilterBase() {
3558        @Override
3559        public ReturnCode filterCell(final Cell ignored) throws IOException {
3560          return ReturnCode.INCLUDE;
3561        }
3562        @Override
3563        public boolean isFamilyEssential(byte[] name) {
3564          return Bytes.equals(name, cf_first);
3565        }
3566      };
3567
3568      scan.setFilter(bogusFilter);
3569      InternalScanner s = region.getScanner(scan);
3570
3571      // Our data looks like this:
3572      // r0: first:a, first:b, second:a, second:b
3573      // r1: first:a, first:b, second:a, second:b
3574      // r2: first:a, first:b, second:a, second:b
3575      // r3: first:a, first:b, second:a, second:b
3576      // r4: first:a, first:b, second:a, second:b
3577      // r5: first:a
3578      // r6: first:a
3579      // r7: first:a
3580      // r8: first:a
3581      // r9: first:a
3582
3583      // But due to next's limit set to 3, we should get this:
3584      // r0: first:a, first:b, second:a
3585      // r0: second:b
3586      // r1: first:a, first:b, second:a
3587      // r1: second:b
3588      // r2: first:a, first:b, second:a
3589      // r2: second:b
3590      // r3: first:a, first:b, second:a
3591      // r3: second:b
3592      // r4: first:a, first:b, second:a
3593      // r4: second:b
3594      // r5: first:a
3595      // r6: first:a
3596      // r7: first:a
3597      // r8: first:a
3598      // r9: first:a
3599
3600      List<Cell> results = new ArrayList<>();
3601      int index = 0;
3602      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3603      while (true) {
3604        boolean more = s.next(results, scannerContext);
3605        if ((index >> 1) < 5) {
3606          if (index % 2 == 0) {
3607            assertEquals(3, results.size());
3608          } else {
3609            assertEquals(1, results.size());
3610          }
3611        } else {
3612          assertEquals(1, results.size());
3613        }
3614        results.clear();
3615        index++;
3616        if (!more) {
3617          break;
3618        }
3619      }
3620    } finally {
3621      HBaseTestingUtility.closeRegionAndWAL(this.region);
3622      this.region = null;
3623    }
3624  }
3625
3626  /**
3627   * Write an HFile block full with Cells whose qualifier that are identical between
3628   * 0 and Short.MAX_VALUE. See HBASE-13329.
3629   * @throws Exception
3630   */
3631  @Test
3632  public void testLongQualifier() throws Exception {
3633    byte[] family = Bytes.toBytes("family");
3634    this.region = initHRegion(tableName, method, CONF, family);
3635    byte[] q = new byte[Short.MAX_VALUE+2];
3636    Arrays.fill(q, 0, q.length-1, (byte)42);
3637    for (byte i=0; i<10; i++) {
3638      Put p = new Put(Bytes.toBytes("row"));
3639      // qualifiers that differ past Short.MAX_VALUE
3640      q[q.length-1]=i;
3641      p.addColumn(family, q, q);
3642      region.put(p);
3643    }
3644    region.flush(false);
3645    HBaseTestingUtility.closeRegionAndWAL(this.region);
3646    this.region = null;
3647  }
3648
3649  /**
3650   * Flushes the cache in a thread while scanning. The tests verify that the
3651   * scan is coherent - e.g. the returned results are always of the same or
3652   * later update as the previous results.
3653   *
3654   * @throws IOException
3655   *           scan / compact
3656   * @throws InterruptedException
3657   *           thread join
3658   */
3659  @Test
3660  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3661    byte[] family = Bytes.toBytes("family");
3662    int numRows = 1000;
3663    int flushAndScanInterval = 10;
3664    int compactInterval = 10 * flushAndScanInterval;
3665
3666    this.region = initHRegion(tableName, method, CONF, family);
3667    FlushThread flushThread = new FlushThread();
3668    try {
3669      flushThread.start();
3670
3671      Scan scan = new Scan();
3672      scan.addFamily(family);
3673      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3674          new BinaryComparator(Bytes.toBytes(5L))));
3675
3676      int expectedCount = 0;
3677      List<Cell> res = new ArrayList<>();
3678
3679      boolean toggle = true;
3680      for (long i = 0; i < numRows; i++) {
3681        Put put = new Put(Bytes.toBytes(i));
3682        put.setDurability(Durability.SKIP_WAL);
3683        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3684        region.put(put);
3685
3686        if (i != 0 && i % compactInterval == 0) {
3687          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3688          region.compact(true);
3689        }
3690
3691        if (i % 10 == 5L) {
3692          expectedCount++;
3693        }
3694
3695        if (i != 0 && i % flushAndScanInterval == 0) {
3696          res.clear();
3697          InternalScanner scanner = region.getScanner(scan);
3698          if (toggle) {
3699            flushThread.flush();
3700          }
3701          while (scanner.next(res))
3702            ;
3703          if (!toggle) {
3704            flushThread.flush();
3705          }
3706          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3707              expectedCount, res.size());
3708          toggle = !toggle;
3709        }
3710      }
3711
3712    } finally {
3713      try {
3714        flushThread.done();
3715        flushThread.join();
3716        flushThread.checkNoError();
3717      } catch (InterruptedException ie) {
3718        LOG.warn("Caught exception when joining with flushThread", ie);
3719      }
3720      HBaseTestingUtility.closeRegionAndWAL(this.region);
3721      this.region = null;
3722    }
3723  }
3724
3725  protected class FlushThread extends Thread {
3726    private volatile boolean done;
3727    private Throwable error = null;
3728
3729    FlushThread() {
3730      super("FlushThread");
3731    }
3732
3733    public void done() {
3734      done = true;
3735      synchronized (this) {
3736        interrupt();
3737      }
3738    }
3739
3740    public void checkNoError() {
3741      if (error != null) {
3742        assertNull(error);
3743      }
3744    }
3745
3746    @Override
3747    public void run() {
3748      done = false;
3749      while (!done) {
3750        synchronized (this) {
3751          try {
3752            wait();
3753          } catch (InterruptedException ignored) {
3754            if (done) {
3755              break;
3756            }
3757          }
3758        }
3759        try {
3760          region.flush(true);
3761        } catch (IOException e) {
3762          if (!done) {
3763            LOG.error("Error while flushing cache", e);
3764            error = e;
3765          }
3766          break;
3767        } catch (Throwable t) {
3768          LOG.error("Uncaught exception", t);
3769          throw t;
3770        }
3771      }
3772    }
3773
3774    public void flush() {
3775      synchronized (this) {
3776        notify();
3777      }
3778    }
3779  }
3780
3781  /**
3782   * Writes very wide records and scans for the latest every time.. Flushes and
3783   * compacts the region every now and then to keep things realistic.
3784   *
3785   * @throws IOException
3786   *           by flush / scan / compaction
3787   * @throws InterruptedException
3788   *           when joining threads
3789   */
3790  @Test
3791  public void testWritesWhileScanning() throws IOException, InterruptedException {
3792    int testCount = 100;
3793    int numRows = 1;
3794    int numFamilies = 10;
3795    int numQualifiers = 100;
3796    int flushInterval = 7;
3797    int compactInterval = 5 * flushInterval;
3798    byte[][] families = new byte[numFamilies][];
3799    for (int i = 0; i < numFamilies; i++) {
3800      families[i] = Bytes.toBytes("family" + i);
3801    }
3802    byte[][] qualifiers = new byte[numQualifiers][];
3803    for (int i = 0; i < numQualifiers; i++) {
3804      qualifiers[i] = Bytes.toBytes("qual" + i);
3805    }
3806
3807    this.region = initHRegion(tableName, method, CONF, families);
3808    FlushThread flushThread = new FlushThread();
3809    PutThread putThread = new PutThread(numRows, families, qualifiers);
3810    try {
3811      putThread.start();
3812      putThread.waitForFirstPut();
3813
3814      flushThread.start();
3815
3816      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3817
3818      int expectedCount = numFamilies * numQualifiers;
3819      List<Cell> res = new ArrayList<>();
3820
3821      long prevTimestamp = 0L;
3822      for (int i = 0; i < testCount; i++) {
3823
3824        if (i != 0 && i % compactInterval == 0) {
3825          region.compact(true);
3826          for (HStore store : region.getStores()) {
3827            store.closeAndArchiveCompactedFiles();
3828          }
3829        }
3830
3831        if (i != 0 && i % flushInterval == 0) {
3832          flushThread.flush();
3833        }
3834
3835        boolean previousEmpty = res.isEmpty();
3836        res.clear();
3837        InternalScanner scanner = region.getScanner(scan);
3838        while (scanner.next(res))
3839          ;
3840        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3841          assertEquals("i=" + i, expectedCount, res.size());
3842          long timestamp = res.get(0).getTimestamp();
3843          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3844              timestamp >= prevTimestamp);
3845          prevTimestamp = timestamp;
3846        }
3847      }
3848
3849      putThread.done();
3850
3851      region.flush(true);
3852
3853    } finally {
3854      try {
3855        flushThread.done();
3856        flushThread.join();
3857        flushThread.checkNoError();
3858
3859        putThread.join();
3860        putThread.checkNoError();
3861      } catch (InterruptedException ie) {
3862        LOG.warn("Caught exception when joining with flushThread", ie);
3863      }
3864
3865      try {
3866          HBaseTestingUtility.closeRegionAndWAL(this.region);
3867      } catch (DroppedSnapshotException dse) {
3868        // We could get this on way out because we interrupt the background flusher and it could
3869        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3870        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3871        // flush because the accounting is 'off' since original DSE.
3872      }
3873      this.region = null;
3874    }
3875  }
3876
3877  protected class PutThread extends Thread {
3878    private volatile boolean done;
3879    private volatile int numPutsFinished = 0;
3880
3881    private Throwable error = null;
3882    private int numRows;
3883    private byte[][] families;
3884    private byte[][] qualifiers;
3885
3886    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3887      super("PutThread");
3888      this.numRows = numRows;
3889      this.families = families;
3890      this.qualifiers = qualifiers;
3891    }
3892
3893    /**
3894     * Block calling thread until this instance of PutThread has put at least one row.
3895     */
3896    public void waitForFirstPut() throws InterruptedException {
3897      // wait until put thread actually puts some data
3898      while (isAlive() && numPutsFinished == 0) {
3899        checkNoError();
3900        Thread.sleep(50);
3901      }
3902    }
3903
3904    public void done() {
3905      done = true;
3906      synchronized (this) {
3907        interrupt();
3908      }
3909    }
3910
3911    public void checkNoError() {
3912      if (error != null) {
3913        assertNull(error);
3914      }
3915    }
3916
3917    @Override
3918    public void run() {
3919      done = false;
3920      while (!done) {
3921        try {
3922          for (int r = 0; r < numRows; r++) {
3923            byte[] row = Bytes.toBytes("row" + r);
3924            Put put = new Put(row);
3925            put.setDurability(Durability.SKIP_WAL);
3926            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3927            for (byte[] family : families) {
3928              for (byte[] qualifier : qualifiers) {
3929                put.addColumn(family, qualifier, numPutsFinished, value);
3930              }
3931            }
3932            region.put(put);
3933            numPutsFinished++;
3934            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3935              System.out.println("put iteration = " + numPutsFinished);
3936              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3937              region.delete(delete);
3938            }
3939            numPutsFinished++;
3940          }
3941        } catch (InterruptedIOException e) {
3942          // This is fine. It means we are done, or didn't get the lock on time
3943          LOG.info("Interrupted", e);
3944        } catch (IOException e) {
3945          LOG.error("Error while putting records", e);
3946          error = e;
3947          break;
3948        }
3949      }
3950
3951    }
3952
3953  }
3954
3955  /**
3956   * Writes very wide records and gets the latest row every time.. Flushes and
3957   * compacts the region aggressivly to catch issues.
3958   *
3959   * @throws IOException
3960   *           by flush / scan / compaction
3961   * @throws InterruptedException
3962   *           when joining threads
3963   */
3964  @Test
3965  public void testWritesWhileGetting() throws Exception {
3966    int testCount = 50;
3967    int numRows = 1;
3968    int numFamilies = 10;
3969    int numQualifiers = 100;
3970    int compactInterval = 100;
3971    byte[][] families = new byte[numFamilies][];
3972    for (int i = 0; i < numFamilies; i++) {
3973      families[i] = Bytes.toBytes("family" + i);
3974    }
3975    byte[][] qualifiers = new byte[numQualifiers][];
3976    for (int i = 0; i < numQualifiers; i++) {
3977      qualifiers[i] = Bytes.toBytes("qual" + i);
3978    }
3979
3980
3981    // This test flushes constantly and can cause many files to be created,
3982    // possibly
3983    // extending over the ulimit. Make sure compactions are aggressive in
3984    // reducing
3985    // the number of HFiles created.
3986    Configuration conf = HBaseConfiguration.create(CONF);
3987    conf.setInt("hbase.hstore.compaction.min", 1);
3988    conf.setInt("hbase.hstore.compaction.max", 1000);
3989    this.region = initHRegion(tableName, method, conf, families);
3990    PutThread putThread = null;
3991    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3992    try {
3993      putThread = new PutThread(numRows, families, qualifiers);
3994      putThread.start();
3995      putThread.waitForFirstPut();
3996
3997      // Add a thread that flushes as fast as possible
3998      ctx.addThread(new RepeatingTestThread(ctx) {
3999
4000        @Override
4001        public void doAnAction() throws Exception {
4002          region.flush(true);
4003          // Compact regularly to avoid creating too many files and exceeding
4004          // the ulimit.
4005          region.compact(false);
4006          for (HStore store : region.getStores()) {
4007            store.closeAndArchiveCompactedFiles();
4008          }
4009        }
4010      });
4011      ctx.startThreads();
4012
4013      Get get = new Get(Bytes.toBytes("row0"));
4014      Result result = null;
4015
4016      int expectedCount = numFamilies * numQualifiers;
4017
4018      long prevTimestamp = 0L;
4019      for (int i = 0; i < testCount; i++) {
4020        LOG.info("testWritesWhileGetting verify turn " + i);
4021        boolean previousEmpty = result == null || result.isEmpty();
4022        result = region.get(get);
4023        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4024          assertEquals("i=" + i, expectedCount, result.size());
4025          // TODO this was removed, now what dangit?!
4026          // search looking for the qualifier in question?
4027          long timestamp = 0;
4028          for (Cell kv : result.rawCells()) {
4029            if (CellUtil.matchingFamily(kv, families[0])
4030                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4031              timestamp = kv.getTimestamp();
4032            }
4033          }
4034          assertTrue(timestamp >= prevTimestamp);
4035          prevTimestamp = timestamp;
4036          Cell previousKV = null;
4037
4038          for (Cell kv : result.rawCells()) {
4039            byte[] thisValue = CellUtil.cloneValue(kv);
4040            if (previousKV != null) {
4041              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4042                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4043                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
4044                    + "(memStoreTS:" + kv.getSequenceId() + ")");
4045                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4046              }
4047            }
4048            previousKV = kv;
4049          }
4050        }
4051      }
4052    } finally {
4053      if (putThread != null)
4054        putThread.done();
4055
4056      region.flush(true);
4057
4058      if (putThread != null) {
4059        putThread.join();
4060        putThread.checkNoError();
4061      }
4062
4063      ctx.stop();
4064      HBaseTestingUtility.closeRegionAndWAL(this.region);
4065      this.region = null;
4066    }
4067  }
4068
4069  @Test
4070  public void testHolesInMeta() throws Exception {
4071    byte[] family = Bytes.toBytes("family");
4072    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4073        false, family);
4074    try {
4075      byte[] rowNotServed = Bytes.toBytes("a");
4076      Get g = new Get(rowNotServed);
4077      try {
4078        region.get(g);
4079        fail();
4080      } catch (WrongRegionException x) {
4081        // OK
4082      }
4083      byte[] row = Bytes.toBytes("y");
4084      g = new Get(row);
4085      region.get(g);
4086    } finally {
4087      HBaseTestingUtility.closeRegionAndWAL(this.region);
4088      this.region = null;
4089    }
4090  }
4091
4092  @Test
4093  public void testIndexesScanWithOneDeletedRow() throws IOException {
4094    byte[] family = Bytes.toBytes("family");
4095
4096    // Setting up region
4097    this.region = initHRegion(tableName, method, CONF, family);
4098    try {
4099      Put put = new Put(Bytes.toBytes(1L));
4100      put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
4101      region.put(put);
4102
4103      region.flush(true);
4104
4105      Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4106      region.delete(delete);
4107
4108      put = new Put(Bytes.toBytes(2L));
4109      put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
4110      region.put(put);
4111
4112      Scan idxScan = new Scan();
4113      idxScan.addFamily(family);
4114      idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4115          new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
4116              new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4117              CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4118      InternalScanner scanner = region.getScanner(idxScan);
4119      List<Cell> res = new ArrayList<>();
4120
4121      while (scanner.next(res))
4122        ;
4123      assertEquals(1L, res.size());
4124    } finally {
4125      HBaseTestingUtility.closeRegionAndWAL(this.region);
4126      this.region = null;
4127    }
4128  }
4129
4130  // ////////////////////////////////////////////////////////////////////////////
4131  // Bloom filter test
4132  // ////////////////////////////////////////////////////////////////////////////
4133  @Test
4134  public void testBloomFilterSize() throws IOException {
4135    byte[] fam1 = Bytes.toBytes("fam1");
4136    byte[] qf1 = Bytes.toBytes("col");
4137    byte[] val1 = Bytes.toBytes("value1");
4138    // Create Table
4139    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
4140        .setBloomFilterType(BloomType.ROWCOL);
4141
4142    HTableDescriptor htd = new HTableDescriptor(tableName);
4143    htd.addFamily(hcd);
4144    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4145    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4146    try {
4147      int num_unique_rows = 10;
4148      int duplicate_multiplier = 2;
4149      int num_storefiles = 4;
4150
4151      int version = 0;
4152      for (int f = 0; f < num_storefiles; f++) {
4153        for (int i = 0; i < duplicate_multiplier; i++) {
4154          for (int j = 0; j < num_unique_rows; j++) {
4155            Put put = new Put(Bytes.toBytes("row" + j));
4156            put.setDurability(Durability.SKIP_WAL);
4157            long ts = version++;
4158            put.addColumn(fam1, qf1, ts, val1);
4159            region.put(put);
4160          }
4161        }
4162        region.flush(true);
4163      }
4164      // before compaction
4165      HStore store = region.getStore(fam1);
4166      Collection<HStoreFile> storeFiles = store.getStorefiles();
4167      for (HStoreFile storefile : storeFiles) {
4168        StoreFileReader reader = storefile.getReader();
4169        reader.loadFileInfo();
4170        reader.loadBloomfilter();
4171        assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4172        assertEquals(num_unique_rows, reader.getFilterEntries());
4173      }
4174
4175      region.compact(true);
4176
4177      // after compaction
4178      storeFiles = store.getStorefiles();
4179      for (HStoreFile storefile : storeFiles) {
4180        StoreFileReader reader = storefile.getReader();
4181        reader.loadFileInfo();
4182        reader.loadBloomfilter();
4183        assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4184        assertEquals(num_unique_rows, reader.getFilterEntries());
4185      }
4186    } finally {
4187      HBaseTestingUtility.closeRegionAndWAL(this.region);
4188      this.region = null;
4189    }
4190  }
4191
4192  @Test
4193  public void testAllColumnsWithBloomFilter() throws IOException {
4194    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4195    byte[] FAMILY = Bytes.toBytes("family");
4196
4197    // Create table
4198    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4199        .setBloomFilterType(BloomType.ROWCOL);
4200    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4201    htd.addFamily(hcd);
4202    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4203    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4204    try {
4205      // For row:0, col:0: insert versions 1 through 5.
4206      byte row[] = Bytes.toBytes("row:" + 0);
4207      byte column[] = Bytes.toBytes("column:" + 0);
4208      Put put = new Put(row);
4209      put.setDurability(Durability.SKIP_WAL);
4210      for (long idx = 1; idx <= 4; idx++) {
4211        put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4212      }
4213      region.put(put);
4214
4215      // Flush
4216      region.flush(true);
4217
4218      // Get rows
4219      Get get = new Get(row);
4220      get.setMaxVersions();
4221      Cell[] kvs = region.get(get).rawCells();
4222
4223      // Check if rows are correct
4224      assertEquals(4, kvs.length);
4225      checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4226      checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4227      checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4228      checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4229    } finally {
4230      HBaseTestingUtility.closeRegionAndWAL(this.region);
4231      this.region = null;
4232    }
4233  }
4234
4235  /**
4236   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4237   * issuing delete row on columns with bloom filter set to row+col
4238   * (BloomType.ROWCOL)
4239   */
4240  @Test
4241  public void testDeleteRowWithBloomFilter() throws IOException {
4242    byte[] familyName = Bytes.toBytes("familyName");
4243
4244    // Create Table
4245    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4246        .setBloomFilterType(BloomType.ROWCOL);
4247
4248    HTableDescriptor htd = new HTableDescriptor(tableName);
4249    htd.addFamily(hcd);
4250    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4251    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4252    try {
4253      // Insert some data
4254      byte row[] = Bytes.toBytes("row1");
4255      byte col[] = Bytes.toBytes("col1");
4256
4257      Put put = new Put(row);
4258      put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4259      region.put(put);
4260      region.flush(true);
4261
4262      Delete del = new Delete(row);
4263      region.delete(del);
4264      region.flush(true);
4265
4266      // Get remaining rows (should have none)
4267      Get get = new Get(row);
4268      get.addColumn(familyName, col);
4269
4270      Cell[] keyValues = region.get(get).rawCells();
4271      assertTrue(keyValues.length == 0);
4272    } finally {
4273      HBaseTestingUtility.closeRegionAndWAL(this.region);
4274      this.region = null;
4275    }
4276  }
4277
4278  @Test
4279  public void testgetHDFSBlocksDistribution() throws Exception {
4280    HBaseTestingUtility htu = new HBaseTestingUtility();
4281    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4282    // break up the file in to more pieces that can be distributed across the three nodes and we
4283    // won't be able to have the condition this test asserts; that at least one node has
4284    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4285    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4286    // final int DEFAULT_BLOCK_SIZE = 1024;
4287    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4288    htu.getConfiguration().setInt("dfs.replication", 2);
4289
4290    // set up a cluster with 3 nodes
4291    MiniHBaseCluster cluster = null;
4292    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4293    int regionServersCount = 3;
4294
4295    try {
4296      cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
4297      byte[][] families = { fam1, fam2 };
4298      Table ht = htu.createTable(tableName, families);
4299
4300      // Setting up region
4301      byte row[] = Bytes.toBytes("row1");
4302      byte col[] = Bytes.toBytes("col1");
4303
4304      Put put = new Put(row);
4305      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4306      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4307      ht.put(put);
4308
4309      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4310      firstRegion.flush(true);
4311      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4312
4313      // Given the default replication factor is 2 and we have 2 HFiles,
4314      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4315      // must be at least one host that have replica for 2 HFiles. That host's
4316      // weight will be equal to the unique block weight.
4317      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4318      StringBuilder sb = new StringBuilder();
4319      for (String host: blocksDistribution1.getTopHosts()) {
4320        if (sb.length() > 0) sb.append(", ");
4321        sb.append(host);
4322        sb.append("=");
4323        sb.append(blocksDistribution1.getWeight(host));
4324      }
4325
4326      String topHost = blocksDistribution1.getTopHosts().get(0);
4327      long topHostWeight = blocksDistribution1.getWeight(topHost);
4328      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4329        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4330      LOG.info(msg);
4331      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4332
4333      // use the static method to compute the value, it should be the same.
4334      // static method is used by load balancer or other components
4335      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4336          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4337      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4338
4339      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4340
4341      ht.close();
4342    } finally {
4343      if (cluster != null) {
4344        htu.shutdownMiniCluster();
4345      }
4346    }
4347  }
4348
4349  /**
4350   * Testcase to check state of region initialization task set to ABORTED or not
4351   * if any exceptions during initialization
4352   *
4353   * @throws Exception
4354   */
4355  @Test
4356  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4357    HRegionInfo info;
4358    try {
4359      FileSystem fs = Mockito.mock(FileSystem.class);
4360      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4361      HTableDescriptor htd = new HTableDescriptor(tableName);
4362      htd.addFamily(new HColumnDescriptor("cf"));
4363      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4364          HConstants.EMPTY_BYTE_ARRAY, false);
4365      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4366      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4367      // region initialization throws IOException and set task state to ABORTED.
4368      region.initialize();
4369      fail("Region initialization should fail due to IOException");
4370    } catch (IOException io) {
4371      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4372      for (MonitoredTask monitoredTask : tasks) {
4373        if (!(monitoredTask instanceof MonitoredRPCHandler)
4374            && monitoredTask.getDescription().contains(region.toString())) {
4375          assertTrue("Region state should be ABORTED.",
4376              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4377          break;
4378        }
4379      }
4380    } finally {
4381      HBaseTestingUtility.closeRegionAndWAL(region);
4382    }
4383  }
4384
4385  /**
4386   * Verifies that the .regioninfo file is written on region creation and that
4387   * is recreated if missing during region opening.
4388   */
4389  @Test
4390  public void testRegionInfoFileCreation() throws IOException {
4391    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4392
4393    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4394    htd.addFamily(new HColumnDescriptor("cf"));
4395
4396    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4397
4398    // Create a region and skip the initialization (like CreateTableHandler)
4399    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4400    Path regionDir = region.getRegionFileSystem().getRegionDir();
4401    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4402    HBaseTestingUtility.closeRegionAndWAL(region);
4403
4404    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4405
4406    // Verify that the .regioninfo file is present
4407    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4408        fs.exists(regionInfoFile));
4409
4410    // Try to open the region
4411    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4412    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4413    HBaseTestingUtility.closeRegionAndWAL(region);
4414
4415    // Verify that the .regioninfo file is still there
4416    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4417        fs.exists(regionInfoFile));
4418
4419    // Remove the .regioninfo file and verify is recreated on region open
4420    fs.delete(regionInfoFile, true);
4421    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4422        fs.exists(regionInfoFile));
4423
4424    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4425//    region = TEST_UTIL.openHRegion(hri, htd);
4426    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4427    HBaseTestingUtility.closeRegionAndWAL(region);
4428
4429    // Verify that the .regioninfo file is still there
4430    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4431        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4432  }
4433
4434  /**
4435   * TestCase for increment
4436   */
4437  private static class Incrementer implements Runnable {
4438    private HRegion region;
4439    private final static byte[] incRow = Bytes.toBytes("incRow");
4440    private final static byte[] family = Bytes.toBytes("family");
4441    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4442    private final static long ONE = 1L;
4443    private int incCounter;
4444
4445    public Incrementer(HRegion region, int incCounter) {
4446      this.region = region;
4447      this.incCounter = incCounter;
4448    }
4449
4450    @Override
4451    public void run() {
4452      int count = 0;
4453      while (count < incCounter) {
4454        Increment inc = new Increment(incRow);
4455        inc.addColumn(family, qualifier, ONE);
4456        count++;
4457        try {
4458          region.increment(inc);
4459        } catch (IOException e) {
4460          LOG.info("Count=" + count + ", " + e);
4461          break;
4462        }
4463      }
4464    }
4465  }
4466
4467  /**
4468   * Test case to check increment function with memstore flushing
4469   * @throws Exception
4470   */
4471  @Test
4472  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4473    byte[] family = Incrementer.family;
4474    this.region = initHRegion(tableName, method, CONF, family);
4475    final HRegion region = this.region;
4476    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4477    Runnable flusher = new Runnable() {
4478      @Override
4479      public void run() {
4480        while (!incrementDone.get()) {
4481          try {
4482            region.flush(true);
4483          } catch (Exception e) {
4484            e.printStackTrace();
4485          }
4486        }
4487      }
4488    };
4489
4490    // after all increment finished, the row will increment to 20*100 = 2000
4491    int threadNum = 20;
4492    int incCounter = 100;
4493    long expected = (long) threadNum * incCounter;
4494    Thread[] incrementers = new Thread[threadNum];
4495    Thread flushThread = new Thread(flusher);
4496    for (int i = 0; i < threadNum; i++) {
4497      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4498      incrementers[i].start();
4499    }
4500    flushThread.start();
4501    for (int i = 0; i < threadNum; i++) {
4502      incrementers[i].join();
4503    }
4504
4505    incrementDone.set(true);
4506    flushThread.join();
4507
4508    Get get = new Get(Incrementer.incRow);
4509    get.addColumn(Incrementer.family, Incrementer.qualifier);
4510    get.setMaxVersions(1);
4511    Result res = this.region.get(get);
4512    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4513
4514    // we just got the latest version
4515    assertEquals(1, kvs.size());
4516    Cell kv = kvs.get(0);
4517    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4518    this.region = null;
4519  }
4520
4521  /**
4522   * TestCase for append
4523   */
4524  private static class Appender implements Runnable {
4525    private HRegion region;
4526    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4527    private final static byte[] family = Bytes.toBytes("family");
4528    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4529    private final static byte[] CHAR = Bytes.toBytes("a");
4530    private int appendCounter;
4531
4532    public Appender(HRegion region, int appendCounter) {
4533      this.region = region;
4534      this.appendCounter = appendCounter;
4535    }
4536
4537    @Override
4538    public void run() {
4539      int count = 0;
4540      while (count < appendCounter) {
4541        Append app = new Append(appendRow);
4542        app.addColumn(family, qualifier, CHAR);
4543        count++;
4544        try {
4545          region.append(app);
4546        } catch (IOException e) {
4547          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4548          break;
4549        }
4550      }
4551    }
4552  }
4553
4554  /**
4555   * Test case to check append function with memstore flushing
4556   * @throws Exception
4557   */
4558  @Test
4559  public void testParallelAppendWithMemStoreFlush() throws Exception {
4560    byte[] family = Appender.family;
4561    this.region = initHRegion(tableName, method, CONF, family);
4562    final HRegion region = this.region;
4563    final AtomicBoolean appendDone = new AtomicBoolean(false);
4564    Runnable flusher = new Runnable() {
4565      @Override
4566      public void run() {
4567        while (!appendDone.get()) {
4568          try {
4569            region.flush(true);
4570          } catch (Exception e) {
4571            e.printStackTrace();
4572          }
4573        }
4574      }
4575    };
4576
4577    // After all append finished, the value will append to threadNum *
4578    // appendCounter Appender.CHAR
4579    int threadNum = 20;
4580    int appendCounter = 100;
4581    byte[] expected = new byte[threadNum * appendCounter];
4582    for (int i = 0; i < threadNum * appendCounter; i++) {
4583      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4584    }
4585    Thread[] appenders = new Thread[threadNum];
4586    Thread flushThread = new Thread(flusher);
4587    for (int i = 0; i < threadNum; i++) {
4588      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4589      appenders[i].start();
4590    }
4591    flushThread.start();
4592    for (int i = 0; i < threadNum; i++) {
4593      appenders[i].join();
4594    }
4595
4596    appendDone.set(true);
4597    flushThread.join();
4598
4599    Get get = new Get(Appender.appendRow);
4600    get.addColumn(Appender.family, Appender.qualifier);
4601    get.setMaxVersions(1);
4602    Result res = this.region.get(get);
4603    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4604
4605    // we just got the latest version
4606    assertEquals(1, kvs.size());
4607    Cell kv = kvs.get(0);
4608    byte[] appendResult = new byte[kv.getValueLength()];
4609    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4610    assertArrayEquals(expected, appendResult);
4611    this.region = null;
4612  }
4613
4614  /**
4615   * Test case to check put function with memstore flushing for same row, same ts
4616   * @throws Exception
4617   */
4618  @Test
4619  public void testPutWithMemStoreFlush() throws Exception {
4620    byte[] family = Bytes.toBytes("family");
4621    byte[] qualifier = Bytes.toBytes("qualifier");
4622    byte[] row = Bytes.toBytes("putRow");
4623    byte[] value = null;
4624    this.region = initHRegion(tableName, method, CONF, family);
4625    Put put = null;
4626    Get get = null;
4627    List<Cell> kvs = null;
4628    Result res = null;
4629
4630    put = new Put(row);
4631    value = Bytes.toBytes("value0");
4632    put.addColumn(family, qualifier, 1234567L, value);
4633    region.put(put);
4634    get = new Get(row);
4635    get.addColumn(family, qualifier);
4636    get.setMaxVersions();
4637    res = this.region.get(get);
4638    kvs = res.getColumnCells(family, qualifier);
4639    assertEquals(1, kvs.size());
4640    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4641
4642    region.flush(true);
4643    get = new Get(row);
4644    get.addColumn(family, qualifier);
4645    get.setMaxVersions();
4646    res = this.region.get(get);
4647    kvs = res.getColumnCells(family, qualifier);
4648    assertEquals(1, kvs.size());
4649    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4650
4651    put = new Put(row);
4652    value = Bytes.toBytes("value1");
4653    put.addColumn(family, qualifier, 1234567L, value);
4654    region.put(put);
4655    get = new Get(row);
4656    get.addColumn(family, qualifier);
4657    get.setMaxVersions();
4658    res = this.region.get(get);
4659    kvs = res.getColumnCells(family, qualifier);
4660    assertEquals(1, kvs.size());
4661    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4662
4663    region.flush(true);
4664    get = new Get(row);
4665    get.addColumn(family, qualifier);
4666    get.setMaxVersions();
4667    res = this.region.get(get);
4668    kvs = res.getColumnCells(family, qualifier);
4669    assertEquals(1, kvs.size());
4670    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4671  }
4672
4673  @Test
4674  public void testDurability() throws Exception {
4675    // there are 5 x 5 cases:
4676    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4677    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4678
4679    // expected cases for append and sync wal
4680    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4681    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4682    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4683
4684    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4685    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4686    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4687
4688    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4689    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4690
4691    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4692    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4693
4694    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4695    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4696    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4697
4698    // expected cases for async wal
4699    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4700    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4701    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4702    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4703    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4704    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4705
4706    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4707    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4708    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4709    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4710    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4711    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4712
4713    // expect skip wal cases
4714    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4715    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4716    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4717    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4718    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4719    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4720
4721  }
4722
4723  private void durabilityTest(String method, Durability tableDurability,
4724      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4725      final boolean expectSyncFromLogSyncer) throws Exception {
4726    Configuration conf = HBaseConfiguration.create(CONF);
4727    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4728    byte[] family = Bytes.toBytes("family");
4729    Path logDir = new Path(new Path(dir + method), "log");
4730    final Configuration walConf = new Configuration(conf);
4731    FSUtils.setRootDir(walConf, logDir);
4732    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4733    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4734    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4735    final WALFactory wals = new WALFactory(walConf, UUID.randomUUID().toString());
4736    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4737    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4738        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4739        new byte[][] { family });
4740
4741    Put put = new Put(Bytes.toBytes("r1"));
4742    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4743    put.setDurability(mutationDurability);
4744    region.put(put);
4745
4746    //verify append called or not
4747    verify(wal, expectAppend ? times(1) : never())
4748      .append((HRegionInfo)any(), (WALKeyImpl)any(),
4749          (WALEdit)any(), Mockito.anyBoolean());
4750
4751    // verify sync called or not
4752    if (expectSync || expectSyncFromLogSyncer) {
4753      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4754        @Override
4755        public boolean evaluate() throws Exception {
4756          try {
4757            if (expectSync) {
4758              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4759            } else if (expectSyncFromLogSyncer) {
4760              verify(wal, times(1)).sync(); // wal syncer calls this one
4761            }
4762          } catch (Throwable ignore) {
4763          }
4764          return true;
4765        }
4766      });
4767    } else {
4768      //verify(wal, never()).sync(anyLong());
4769      verify(wal, never()).sync();
4770    }
4771
4772    HBaseTestingUtility.closeRegionAndWAL(this.region);
4773    wals.close();
4774    this.region = null;
4775  }
4776
4777  @Test
4778  public void testRegionReplicaSecondary() throws IOException {
4779    // create a primary region, load some data and flush
4780    // create a secondary region, and do a get against that
4781    Path rootDir = new Path(dir + name.getMethodName());
4782    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4783
4784    byte[][] families = new byte[][] {
4785        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4786    };
4787    byte[] cq = Bytes.toBytes("cq");
4788    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4789    for (byte[] family : families) {
4790      htd.addFamily(new HColumnDescriptor(family));
4791    }
4792
4793    long time = System.currentTimeMillis();
4794    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4795      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4796      false, time, 0);
4797    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4798      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4799      false, time, 1);
4800
4801    HRegion primaryRegion = null, secondaryRegion = null;
4802
4803    try {
4804      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4805          rootDir, TEST_UTIL.getConfiguration(), htd);
4806
4807      // load some data
4808      putData(primaryRegion, 0, 1000, cq, families);
4809
4810      // flush region
4811      primaryRegion.flush(true);
4812
4813      // open secondary region
4814      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4815
4816      verifyData(secondaryRegion, 0, 1000, cq, families);
4817    } finally {
4818      if (primaryRegion != null) {
4819        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4820      }
4821      if (secondaryRegion != null) {
4822        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4823      }
4824    }
4825  }
4826
4827  @Test
4828  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4829    // create a primary region, load some data and flush
4830    // create a secondary region, and do a put against that
4831    Path rootDir = new Path(dir + name.getMethodName());
4832    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4833
4834    byte[][] families = new byte[][] {
4835        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4836    };
4837    byte[] cq = Bytes.toBytes("cq");
4838    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4839    for (byte[] family : families) {
4840      htd.addFamily(new HColumnDescriptor(family));
4841    }
4842
4843    long time = System.currentTimeMillis();
4844    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4845      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4846      false, time, 0);
4847    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4848      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4849      false, time, 1);
4850
4851    HRegion primaryRegion = null, secondaryRegion = null;
4852
4853    try {
4854      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4855          rootDir, TEST_UTIL.getConfiguration(), htd);
4856
4857      // load some data
4858      putData(primaryRegion, 0, 1000, cq, families);
4859
4860      // flush region
4861      primaryRegion.flush(true);
4862
4863      // open secondary region
4864      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4865
4866      try {
4867        putData(secondaryRegion, 0, 1000, cq, families);
4868        fail("Should have thrown exception");
4869      } catch (IOException ex) {
4870        // expected
4871      }
4872    } finally {
4873      if (primaryRegion != null) {
4874        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4875      }
4876      if (secondaryRegion != null) {
4877        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4878      }
4879    }
4880  }
4881
4882  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4883    Configuration confForWAL = new Configuration(conf);
4884    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4885    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4886  }
4887
4888  @Test
4889  public void testCompactionFromPrimary() throws IOException {
4890    Path rootDir = new Path(dir + name.getMethodName());
4891    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4892
4893    byte[][] families = new byte[][] {
4894        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4895    };
4896    byte[] cq = Bytes.toBytes("cq");
4897    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4898    for (byte[] family : families) {
4899      htd.addFamily(new HColumnDescriptor(family));
4900    }
4901
4902    long time = System.currentTimeMillis();
4903    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4904      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4905      false, time, 0);
4906    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4907      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4908      false, time, 1);
4909
4910    HRegion primaryRegion = null, secondaryRegion = null;
4911
4912    try {
4913      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4914          rootDir, TEST_UTIL.getConfiguration(), htd);
4915
4916      // load some data
4917      putData(primaryRegion, 0, 1000, cq, families);
4918
4919      // flush region
4920      primaryRegion.flush(true);
4921
4922      // open secondary region
4923      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4924
4925      // move the file of the primary region to the archive, simulating a compaction
4926      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4927      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4928      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4929          .getStoreFiles(families[0]);
4930      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4931
4932      verifyData(secondaryRegion, 0, 1000, cq, families);
4933    } finally {
4934      if (primaryRegion != null) {
4935        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4936      }
4937      if (secondaryRegion != null) {
4938        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4939      }
4940    }
4941  }
4942
4943  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4944      IOException {
4945    putData(this.region, startRow, numRows, qf, families);
4946  }
4947
4948  private void putData(HRegion region,
4949      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4950    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4951  }
4952
4953  static void putData(HRegion region, Durability durability,
4954      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4955    for (int i = startRow; i < startRow + numRows; i++) {
4956      Put put = new Put(Bytes.toBytes("" + i));
4957      put.setDurability(durability);
4958      for (byte[] family : families) {
4959        put.addColumn(family, qf, null);
4960      }
4961      region.put(put);
4962      LOG.info(put.toString());
4963    }
4964  }
4965
4966  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4967      throws IOException {
4968    for (int i = startRow; i < startRow + numRows; i++) {
4969      byte[] row = Bytes.toBytes("" + i);
4970      Get get = new Get(row);
4971      for (byte[] family : families) {
4972        get.addColumn(family, qf);
4973      }
4974      Result result = newReg.get(get);
4975      Cell[] raw = result.rawCells();
4976      assertEquals(families.length, result.size());
4977      for (int j = 0; j < families.length; j++) {
4978        assertTrue(CellUtil.matchingRows(raw[j], row));
4979        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4980        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4981      }
4982    }
4983  }
4984
4985  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4986    // Now I have k, get values out and assert they are as expected.
4987    Get get = new Get(k).addFamily(family).setMaxVersions();
4988    Cell[] results = r.get(get).rawCells();
4989    for (int j = 0; j < results.length; j++) {
4990      byte[] tmp = CellUtil.cloneValue(results[j]);
4991      // Row should be equal to value every time.
4992      assertTrue(Bytes.equals(k, tmp));
4993    }
4994  }
4995
4996  /*
4997   * Assert first value in the passed region is <code>firstValue</code>.
4998   *
4999   * @param r
5000   *
5001   * @param fs
5002   *
5003   * @param firstValue
5004   *
5005   * @throws IOException
5006   */
5007  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5008      throws IOException {
5009    byte[][] families = { fs };
5010    Scan scan = new Scan();
5011    for (int i = 0; i < families.length; i++)
5012      scan.addFamily(families[i]);
5013    InternalScanner s = r.getScanner(scan);
5014    try {
5015      List<Cell> curVals = new ArrayList<>();
5016      boolean first = true;
5017      OUTER_LOOP: while (s.next(curVals)) {
5018        for (Cell kv : curVals) {
5019          byte[] val = CellUtil.cloneValue(kv);
5020          byte[] curval = val;
5021          if (first) {
5022            first = false;
5023            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5024          } else {
5025            // Not asserting anything. Might as well break.
5026            break OUTER_LOOP;
5027          }
5028        }
5029      }
5030    } finally {
5031      s.close();
5032    }
5033  }
5034
5035  /**
5036   * Test that we get the expected flush results back
5037   */
5038  @Test
5039  public void testFlushResult() throws IOException {
5040    byte[] family = Bytes.toBytes("family");
5041
5042    this.region = initHRegion(tableName, method, family);
5043
5044    // empty memstore, flush doesn't run
5045    HRegion.FlushResult fr = region.flush(true);
5046    assertFalse(fr.isFlushSucceeded());
5047    assertFalse(fr.isCompactionNeeded());
5048
5049    // Flush enough files to get up to the threshold, doesn't need compactions
5050    for (int i = 0; i < 2; i++) {
5051      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5052      region.put(put);
5053      fr = region.flush(true);
5054      assertTrue(fr.isFlushSucceeded());
5055      assertFalse(fr.isCompactionNeeded());
5056    }
5057
5058    // Two flushes after the threshold, compactions are needed
5059    for (int i = 0; i < 2; i++) {
5060      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5061      region.put(put);
5062      fr = region.flush(true);
5063      assertTrue(fr.isFlushSucceeded());
5064      assertTrue(fr.isCompactionNeeded());
5065    }
5066  }
5067
5068  protected Configuration initSplit() {
5069    // Always compact if there is more than one store file.
5070    CONF.setInt("hbase.hstore.compactionThreshold", 2);
5071
5072    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5073
5074    // Increase the amount of time between client retries
5075    CONF.setLong("hbase.client.pause", 15 * 1000);
5076
5077    // This size should make it so we always split using the addContent
5078    // below. After adding all data, the first region is 1.3M
5079    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5080    return CONF;
5081  }
5082
5083  /**
5084   * @return A region on which you must call
5085   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5086   */
5087  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5088      byte[]... families) throws IOException {
5089    return initHRegion(tableName, callingMethod, conf, false, families);
5090  }
5091
5092  /**
5093   * @return A region on which you must call
5094   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5095   */
5096  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5097      boolean isReadOnly, byte[]... families) throws IOException {
5098    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5099  }
5100
5101  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5102      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5103      throws IOException {
5104    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
5105    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
5106    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
5107    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
5108    return initHRegion(tableName, startKey, stopKey, isReadOnly,
5109        Durability.SYNC_WAL, wal, families);
5110  }
5111
5112  /**
5113   * @return A region on which you must call
5114   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5115   */
5116  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5117      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
5118    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
5119        isReadOnly, durability, wal, families);
5120  }
5121
5122  /**
5123   * Assert that the passed in Cell has expected contents for the specified row,
5124   * column & timestamp.
5125   */
5126  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5127    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5128    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5129        Bytes.toString(CellUtil.cloneRow(kv)));
5130    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5131        Bytes.toString(CellUtil.cloneFamily(kv)));
5132    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5133        Bytes.toString(CellUtil.cloneQualifier(kv)));
5134    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5135    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5136        Bytes.toString(CellUtil.cloneValue(kv)));
5137  }
5138
5139  @Test
5140  public void testReverseScanner_FromMemStore_SingleCF_Normal()
5141      throws IOException {
5142    byte[] rowC = Bytes.toBytes("rowC");
5143    byte[] rowA = Bytes.toBytes("rowA");
5144    byte[] rowB = Bytes.toBytes("rowB");
5145    byte[] cf = Bytes.toBytes("CF");
5146    byte[][] families = { cf };
5147    byte[] col = Bytes.toBytes("C");
5148    long ts = 1;
5149    this.region = initHRegion(tableName, method, families);
5150    try {
5151      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5152      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5153          null);
5154      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5155      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5156      Put put = null;
5157      put = new Put(rowC);
5158      put.add(kv1);
5159      put.add(kv11);
5160      region.put(put);
5161      put = new Put(rowA);
5162      put.add(kv2);
5163      region.put(put);
5164      put = new Put(rowB);
5165      put.add(kv3);
5166      region.put(put);
5167
5168      Scan scan = new Scan(rowC);
5169      scan.setMaxVersions(5);
5170      scan.setReversed(true);
5171      InternalScanner scanner = region.getScanner(scan);
5172      List<Cell> currRow = new ArrayList<>();
5173      boolean hasNext = scanner.next(currRow);
5174      assertEquals(2, currRow.size());
5175      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5176          .get(0).getRowLength(), rowC, 0, rowC.length));
5177      assertTrue(hasNext);
5178      currRow.clear();
5179      hasNext = scanner.next(currRow);
5180      assertEquals(1, currRow.size());
5181      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5182          .get(0).getRowLength(), rowB, 0, rowB.length));
5183      assertTrue(hasNext);
5184      currRow.clear();
5185      hasNext = scanner.next(currRow);
5186      assertEquals(1, currRow.size());
5187      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5188          .get(0).getRowLength(), rowA, 0, rowA.length));
5189      assertFalse(hasNext);
5190      scanner.close();
5191    } finally {
5192      HBaseTestingUtility.closeRegionAndWAL(this.region);
5193      this.region = null;
5194    }
5195  }
5196
5197  @Test
5198  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5199      throws IOException {
5200    byte[] rowC = Bytes.toBytes("rowC");
5201    byte[] rowA = Bytes.toBytes("rowA");
5202    byte[] rowB = Bytes.toBytes("rowB");
5203    byte[] rowD = Bytes.toBytes("rowD");
5204    byte[] cf = Bytes.toBytes("CF");
5205    byte[][] families = { cf };
5206    byte[] col = Bytes.toBytes("C");
5207    long ts = 1;
5208    this.region = initHRegion(tableName, method, families);
5209    try {
5210      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5211      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5212          null);
5213      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5214      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5215      Put put = null;
5216      put = new Put(rowC);
5217      put.add(kv1);
5218      put.add(kv11);
5219      region.put(put);
5220      put = new Put(rowA);
5221      put.add(kv2);
5222      region.put(put);
5223      put = new Put(rowB);
5224      put.add(kv3);
5225      region.put(put);
5226
5227      Scan scan = new Scan(rowD);
5228      List<Cell> currRow = new ArrayList<>();
5229      scan.setReversed(true);
5230      scan.setMaxVersions(5);
5231      InternalScanner scanner = region.getScanner(scan);
5232      boolean hasNext = scanner.next(currRow);
5233      assertEquals(2, currRow.size());
5234      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5235          .get(0).getRowLength(), rowC, 0, rowC.length));
5236      assertTrue(hasNext);
5237      currRow.clear();
5238      hasNext = scanner.next(currRow);
5239      assertEquals(1, currRow.size());
5240      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5241          .get(0).getRowLength(), rowB, 0, rowB.length));
5242      assertTrue(hasNext);
5243      currRow.clear();
5244      hasNext = scanner.next(currRow);
5245      assertEquals(1, currRow.size());
5246      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5247          .get(0).getRowLength(), rowA, 0, rowA.length));
5248      assertFalse(hasNext);
5249      scanner.close();
5250    } finally {
5251      HBaseTestingUtility.closeRegionAndWAL(this.region);
5252      this.region = null;
5253    }
5254  }
5255
5256  @Test
5257  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5258      throws IOException {
5259    byte[] rowC = Bytes.toBytes("rowC");
5260    byte[] rowA = Bytes.toBytes("rowA");
5261    byte[] rowB = Bytes.toBytes("rowB");
5262    byte[] cf = Bytes.toBytes("CF");
5263    byte[][] families = { cf };
5264    byte[] col = Bytes.toBytes("C");
5265    long ts = 1;
5266    this.region = initHRegion(tableName, method, families);
5267    try {
5268      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5269      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5270          null);
5271      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5272      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5273      Put put = null;
5274      put = new Put(rowC);
5275      put.add(kv1);
5276      put.add(kv11);
5277      region.put(put);
5278      put = new Put(rowA);
5279      put.add(kv2);
5280      region.put(put);
5281      put = new Put(rowB);
5282      put.add(kv3);
5283      region.put(put);
5284      Scan scan = new Scan();
5285      List<Cell> currRow = new ArrayList<>();
5286      scan.setReversed(true);
5287      InternalScanner scanner = region.getScanner(scan);
5288      boolean hasNext = scanner.next(currRow);
5289      assertEquals(1, currRow.size());
5290      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5291          .get(0).getRowLength(), rowC, 0, rowC.length));
5292      assertTrue(hasNext);
5293      currRow.clear();
5294      hasNext = scanner.next(currRow);
5295      assertEquals(1, currRow.size());
5296      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5297          .get(0).getRowLength(), rowB, 0, rowB.length));
5298      assertTrue(hasNext);
5299      currRow.clear();
5300      hasNext = scanner.next(currRow);
5301      assertEquals(1, currRow.size());
5302      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5303          .get(0).getRowLength(), rowA, 0, rowA.length));
5304      assertFalse(hasNext);
5305      scanner.close();
5306    } finally {
5307      HBaseTestingUtility.closeRegionAndWAL(this.region);
5308      this.region = null;
5309    }
5310  }
5311
5312  @Test
5313  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5314    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5315    byte[] rowA = Bytes.toBytes("rowA");
5316    byte[] rowB = Bytes.toBytes("rowB");
5317    byte[] rowC = Bytes.toBytes("rowC");
5318    byte[] rowD = Bytes.toBytes("rowD");
5319    byte[] rowE = Bytes.toBytes("rowE");
5320    byte[] cf = Bytes.toBytes("CF");
5321    byte[][] families = { cf };
5322    byte[] col1 = Bytes.toBytes("col1");
5323    byte[] col2 = Bytes.toBytes("col2");
5324    long ts = 1;
5325    this.region = initHRegion(tableName, method, families);
5326    try {
5327      KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5328      KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5329      KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5330      KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5331      KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5332      KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5333      Put put = null;
5334      put = new Put(rowA);
5335      put.add(kv1);
5336      region.put(put);
5337      put = new Put(rowB);
5338      put.add(kv2);
5339      region.put(put);
5340      put = new Put(rowC);
5341      put.add(kv3);
5342      region.put(put);
5343      put = new Put(rowD);
5344      put.add(kv4_1);
5345      region.put(put);
5346      put = new Put(rowD);
5347      put.add(kv4_2);
5348      region.put(put);
5349      put = new Put(rowE);
5350      put.add(kv5);
5351      region.put(put);
5352      region.flush(true);
5353      Scan scan = new Scan(rowD, rowA);
5354      scan.addColumn(families[0], col1);
5355      scan.setReversed(true);
5356      List<Cell> currRow = new ArrayList<>();
5357      InternalScanner scanner = region.getScanner(scan);
5358      boolean hasNext = scanner.next(currRow);
5359      assertEquals(1, currRow.size());
5360      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5361          .get(0).getRowLength(), rowD, 0, rowD.length));
5362      assertTrue(hasNext);
5363      currRow.clear();
5364      hasNext = scanner.next(currRow);
5365      assertEquals(1, currRow.size());
5366      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5367          .get(0).getRowLength(), rowC, 0, rowC.length));
5368      assertTrue(hasNext);
5369      currRow.clear();
5370      hasNext = scanner.next(currRow);
5371      assertEquals(1, currRow.size());
5372      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5373          .get(0).getRowLength(), rowB, 0, rowB.length));
5374      assertFalse(hasNext);
5375      scanner.close();
5376
5377      scan = new Scan(rowD, rowA);
5378      scan.addColumn(families[0], col2);
5379      scan.setReversed(true);
5380      currRow.clear();
5381      scanner = region.getScanner(scan);
5382      hasNext = scanner.next(currRow);
5383      assertEquals(1, currRow.size());
5384      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5385          .get(0).getRowLength(), rowD, 0, rowD.length));
5386      scanner.close();
5387    } finally {
5388      HBaseTestingUtility.closeRegionAndWAL(this.region);
5389      this.region = null;
5390    }
5391  }
5392
5393  @Test
5394  public void testReverseScanner_smaller_blocksize() throws IOException {
5395    // case to ensure no conflict with HFile index optimization
5396    byte[] rowA = Bytes.toBytes("rowA");
5397    byte[] rowB = Bytes.toBytes("rowB");
5398    byte[] rowC = Bytes.toBytes("rowC");
5399    byte[] rowD = Bytes.toBytes("rowD");
5400    byte[] rowE = Bytes.toBytes("rowE");
5401    byte[] cf = Bytes.toBytes("CF");
5402    byte[][] families = { cf };
5403    byte[] col1 = Bytes.toBytes("col1");
5404    byte[] col2 = Bytes.toBytes("col2");
5405    long ts = 1;
5406    HBaseConfiguration config = new HBaseConfiguration();
5407    config.setInt("test.block.size", 1);
5408    this.region = initHRegion(tableName, method, config, families);
5409    try {
5410      KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5411      KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5412      KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5413      KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5414      KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5415      KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5416      Put put = null;
5417      put = new Put(rowA);
5418      put.add(kv1);
5419      region.put(put);
5420      put = new Put(rowB);
5421      put.add(kv2);
5422      region.put(put);
5423      put = new Put(rowC);
5424      put.add(kv3);
5425      region.put(put);
5426      put = new Put(rowD);
5427      put.add(kv4_1);
5428      region.put(put);
5429      put = new Put(rowD);
5430      put.add(kv4_2);
5431      region.put(put);
5432      put = new Put(rowE);
5433      put.add(kv5);
5434      region.put(put);
5435      region.flush(true);
5436      Scan scan = new Scan(rowD, rowA);
5437      scan.addColumn(families[0], col1);
5438      scan.setReversed(true);
5439      List<Cell> currRow = new ArrayList<>();
5440      InternalScanner scanner = region.getScanner(scan);
5441      boolean hasNext = scanner.next(currRow);
5442      assertEquals(1, currRow.size());
5443      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5444          .get(0).getRowLength(), rowD, 0, rowD.length));
5445      assertTrue(hasNext);
5446      currRow.clear();
5447      hasNext = scanner.next(currRow);
5448      assertEquals(1, currRow.size());
5449      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5450          .get(0).getRowLength(), rowC, 0, rowC.length));
5451      assertTrue(hasNext);
5452      currRow.clear();
5453      hasNext = scanner.next(currRow);
5454      assertEquals(1, currRow.size());
5455      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5456          .get(0).getRowLength(), rowB, 0, rowB.length));
5457      assertFalse(hasNext);
5458      scanner.close();
5459
5460      scan = new Scan(rowD, rowA);
5461      scan.addColumn(families[0], col2);
5462      scan.setReversed(true);
5463      currRow.clear();
5464      scanner = region.getScanner(scan);
5465      hasNext = scanner.next(currRow);
5466      assertEquals(1, currRow.size());
5467      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5468          .get(0).getRowLength(), rowD, 0, rowD.length));
5469      scanner.close();
5470    } finally {
5471      HBaseTestingUtility.closeRegionAndWAL(this.region);
5472      this.region = null;
5473    }
5474  }
5475
5476  @Test
5477  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5478      throws IOException {
5479    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5480    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5481    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5482    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5483    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5484    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5485    byte[] cf1 = Bytes.toBytes("CF1");
5486    byte[] cf2 = Bytes.toBytes("CF2");
5487    byte[] cf3 = Bytes.toBytes("CF3");
5488    byte[][] families = { cf1, cf2, cf3 };
5489    byte[] col = Bytes.toBytes("C");
5490    long ts = 1;
5491    HBaseConfiguration conf = new HBaseConfiguration();
5492    // disable compactions in this test.
5493    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5494    this.region = initHRegion(tableName, method, conf, families);
5495    try {
5496      // kv naming style: kv(row number) totalKvCountInThisRow seq no
5497      KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5498          null);
5499      KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5500          null);
5501      KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5502          KeyValue.Type.Put, null);
5503      KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5504          null);
5505      KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5506          null);
5507      KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5508          null);
5509      KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5510          KeyValue.Type.Put, null);
5511      KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5512          null);
5513      KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5514          KeyValue.Type.Put, null);
5515      KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5516          null);
5517      KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5518          null);
5519      KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5520          KeyValue.Type.Put, null);
5521      KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5522          null);
5523      KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5524          KeyValue.Type.Put, null);
5525      KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5526          null);
5527      KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5528          null);
5529      // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5530      Put put = null;
5531      put = new Put(row1);
5532      put.add(kv1_2_1);
5533      region.put(put);
5534      put = new Put(row2);
5535      put.add(kv2_4_1);
5536      region.put(put);
5537      put = new Put(row4);
5538      put.add(kv4_5_4);
5539      put.add(kv4_5_5);
5540      region.put(put);
5541      region.flush(true);
5542      // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5543      put = new Put(row4);
5544      put.add(kv4_5_1);
5545      put.add(kv4_5_3);
5546      region.put(put);
5547      put = new Put(row1);
5548      put.add(kv1_2_2);
5549      region.put(put);
5550      put = new Put(row2);
5551      put.add(kv2_4_4);
5552      region.put(put);
5553      region.flush(true);
5554      // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5555      put = new Put(row4);
5556      put.add(kv4_5_2);
5557      region.put(put);
5558      put = new Put(row2);
5559      put.add(kv2_4_2);
5560      put.add(kv2_4_3);
5561      region.put(put);
5562      put = new Put(row3);
5563      put.add(kv3_2_2);
5564      region.put(put);
5565      region.flush(true);
5566      // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5567      // ( 2 kv)
5568      put = new Put(row0);
5569      put.add(kv0_1_1);
5570      region.put(put);
5571      put = new Put(row3);
5572      put.add(kv3_2_1);
5573      region.put(put);
5574      put = new Put(row5);
5575      put.add(kv5_2_1);
5576      put.add(kv5_2_2);
5577      region.put(put);
5578      // scan range = ["row4", min), skip the max "row5"
5579      Scan scan = new Scan(row4);
5580      scan.setMaxVersions(5);
5581      scan.setBatch(3);
5582      scan.setReversed(true);
5583      InternalScanner scanner = region.getScanner(scan);
5584      List<Cell> currRow = new ArrayList<>();
5585      boolean hasNext = false;
5586      // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5587      // included in scan range
5588      // "row4" takes 2 next() calls since batch=3
5589      hasNext = scanner.next(currRow);
5590      assertEquals(3, currRow.size());
5591      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5592          .get(0).getRowLength(), row4, 0, row4.length));
5593      assertTrue(hasNext);
5594      currRow.clear();
5595      hasNext = scanner.next(currRow);
5596      assertEquals(2, currRow.size());
5597      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5598          currRow.get(0).getRowLength(), row4, 0,
5599        row4.length));
5600      assertTrue(hasNext);
5601      // 2. scan out "row3" (2 kv)
5602      currRow.clear();
5603      hasNext = scanner.next(currRow);
5604      assertEquals(2, currRow.size());
5605      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5606          .get(0).getRowLength(), row3, 0, row3.length));
5607      assertTrue(hasNext);
5608      // 3. scan out "row2" (4 kvs)
5609      // "row2" takes 2 next() calls since batch=3
5610      currRow.clear();
5611      hasNext = scanner.next(currRow);
5612      assertEquals(3, currRow.size());
5613      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5614          .get(0).getRowLength(), row2, 0, row2.length));
5615      assertTrue(hasNext);
5616      currRow.clear();
5617      hasNext = scanner.next(currRow);
5618      assertEquals(1, currRow.size());
5619      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5620          .get(0).getRowLength(), row2, 0, row2.length));
5621      assertTrue(hasNext);
5622      // 4. scan out "row1" (2 kv)
5623      currRow.clear();
5624      hasNext = scanner.next(currRow);
5625      assertEquals(2, currRow.size());
5626      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5627          .get(0).getRowLength(), row1, 0, row1.length));
5628      assertTrue(hasNext);
5629      // 5. scan out "row0" (1 kv)
5630      currRow.clear();
5631      hasNext = scanner.next(currRow);
5632      assertEquals(1, currRow.size());
5633      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5634          .get(0).getRowLength(), row0, 0, row0.length));
5635      assertFalse(hasNext);
5636
5637      scanner.close();
5638    } finally {
5639      HBaseTestingUtility.closeRegionAndWAL(this.region);
5640      this.region = null;
5641    }
5642  }
5643
5644  @Test
5645  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5646      throws IOException {
5647    byte[] row1 = Bytes.toBytes("row1");
5648    byte[] row2 = Bytes.toBytes("row2");
5649    byte[] row3 = Bytes.toBytes("row3");
5650    byte[] row4 = Bytes.toBytes("row4");
5651    byte[] cf1 = Bytes.toBytes("CF1");
5652    byte[] cf2 = Bytes.toBytes("CF2");
5653    byte[] cf3 = Bytes.toBytes("CF3");
5654    byte[] cf4 = Bytes.toBytes("CF4");
5655    byte[][] families = { cf1, cf2, cf3, cf4 };
5656    byte[] col = Bytes.toBytes("C");
5657    long ts = 1;
5658    HBaseConfiguration conf = new HBaseConfiguration();
5659    // disable compactions in this test.
5660    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5661    this.region = initHRegion(tableName, method, conf, families);
5662    try {
5663      KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5664      KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5665      KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5666      KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5667      // storefile1
5668      Put put = new Put(row1);
5669      put.add(kv1);
5670      region.put(put);
5671      region.flush(true);
5672      // storefile2
5673      put = new Put(row2);
5674      put.add(kv2);
5675      region.put(put);
5676      region.flush(true);
5677      // storefile3
5678      put = new Put(row3);
5679      put.add(kv3);
5680      region.put(put);
5681      region.flush(true);
5682      // memstore
5683      put = new Put(row4);
5684      put.add(kv4);
5685      region.put(put);
5686      // scan range = ["row4", min)
5687      Scan scan = new Scan(row4);
5688      scan.setReversed(true);
5689      scan.setBatch(10);
5690      InternalScanner scanner = region.getScanner(scan);
5691      List<Cell> currRow = new ArrayList<>();
5692      boolean hasNext = scanner.next(currRow);
5693      assertEquals(1, currRow.size());
5694      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5695          .get(0).getRowLength(), row4, 0, row4.length));
5696      assertTrue(hasNext);
5697      currRow.clear();
5698      hasNext = scanner.next(currRow);
5699      assertEquals(1, currRow.size());
5700      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5701          .get(0).getRowLength(), row3, 0, row3.length));
5702      assertTrue(hasNext);
5703      currRow.clear();
5704      hasNext = scanner.next(currRow);
5705      assertEquals(1, currRow.size());
5706      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5707          .get(0).getRowLength(), row2, 0, row2.length));
5708      assertTrue(hasNext);
5709      currRow.clear();
5710      hasNext = scanner.next(currRow);
5711      assertEquals(1, currRow.size());
5712      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5713          .get(0).getRowLength(), row1, 0, row1.length));
5714      assertFalse(hasNext);
5715    } finally {
5716      HBaseTestingUtility.closeRegionAndWAL(this.region);
5717      this.region = null;
5718    }
5719  }
5720
5721  /**
5722   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5723   */
5724  @Test
5725  public void testReverseScanner_StackOverflow() throws IOException {
5726    byte[] cf1 = Bytes.toBytes("CF1");
5727    byte[][] families = {cf1};
5728    byte[] col = Bytes.toBytes("C");
5729    HBaseConfiguration conf = new HBaseConfiguration();
5730    this.region = initHRegion(tableName, method, conf, families);
5731    try {
5732      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5733      Put put = new Put(Bytes.toBytes("19998"));
5734      put.addColumn(cf1, col, Bytes.toBytes("val"));
5735      region.put(put);
5736      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5737      Put put2 = new Put(Bytes.toBytes("19997"));
5738      put2.addColumn(cf1, col, Bytes.toBytes("val"));
5739      region.put(put2);
5740
5741      Scan scan = new Scan(Bytes.toBytes("19998"));
5742      scan.setReversed(true);
5743      InternalScanner scanner = region.getScanner(scan);
5744
5745      // create one storefile contains many rows will be skipped
5746      // to check StoreFileScanner.seekToPreviousRow
5747      for (int i = 10000; i < 20000; i++) {
5748        Put p = new Put(Bytes.toBytes(""+i));
5749        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5750        region.put(p);
5751      }
5752      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5753
5754      // create one memstore contains many rows will be skipped
5755      // to check MemStoreScanner.seekToPreviousRow
5756      for (int i = 10000; i < 20000; i++) {
5757        Put p = new Put(Bytes.toBytes(""+i));
5758        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5759        region.put(p);
5760      }
5761
5762      List<Cell> currRow = new ArrayList<>();
5763      boolean hasNext;
5764      do {
5765        hasNext = scanner.next(currRow);
5766      } while (hasNext);
5767      assertEquals(2, currRow.size());
5768      assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5769        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5770      assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5771        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5772    } finally {
5773      HBaseTestingUtility.closeRegionAndWAL(this.region);
5774      this.region = null;
5775    }
5776  }
5777
5778  @Test
5779  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5780    byte[] cf1 = Bytes.toBytes("CF1");
5781    byte[][] families = { cf1 };
5782    byte[] col = Bytes.toBytes("C");
5783    HBaseConfiguration conf = new HBaseConfiguration();
5784    this.region = initHRegion(tableName, method, conf, families);
5785    try {
5786      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5787      Put put = new Put(Bytes.toBytes("19996"));
5788      put.addColumn(cf1, col, Bytes.toBytes("val"));
5789      region.put(put);
5790      Put put2 = new Put(Bytes.toBytes("19995"));
5791      put2.addColumn(cf1, col, Bytes.toBytes("val"));
5792      region.put(put2);
5793      // create a reverse scan
5794      Scan scan = new Scan(Bytes.toBytes("19996"));
5795      scan.setReversed(true);
5796      RegionScannerImpl scanner = region.getScanner(scan);
5797
5798      // flush the cache. This will reset the store scanner
5799      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5800
5801      // create one memstore contains many rows will be skipped
5802      // to check MemStoreScanner.seekToPreviousRow
5803      for (int i = 10000; i < 20000; i++) {
5804        Put p = new Put(Bytes.toBytes("" + i));
5805        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5806        region.put(p);
5807      }
5808      List<Cell> currRow = new ArrayList<>();
5809      boolean hasNext;
5810      boolean assertDone = false;
5811      do {
5812        hasNext = scanner.next(currRow);
5813        // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5814        // added here
5815        if (!assertDone) {
5816          StoreScanner current =
5817              (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5818          List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5819          assertEquals("There should be only one scanner the store file scanner", 1,
5820            scanners.size());
5821          assertDone = true;
5822        }
5823      } while (hasNext);
5824      assertEquals(2, currRow.size());
5825      assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5826        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5827      assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5828        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5829    } finally {
5830      HBaseTestingUtility.closeRegionAndWAL(this.region);
5831      this.region = null;
5832    }
5833  }
5834
5835  @Test
5836  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5837    byte[] cf1 = Bytes.toBytes("CF1");
5838    byte[][] families = { cf1 };
5839    byte[] col = Bytes.toBytes("C");
5840
5841    HBaseConfiguration conf = new HBaseConfiguration();
5842    this.region = initHRegion(tableName, method, conf, families);
5843
5844    Put put = new Put(Bytes.toBytes("199996"));
5845    put.addColumn(cf1, col, Bytes.toBytes("val"));
5846    region.put(put);
5847    Put put2 = new Put(Bytes.toBytes("199995"));
5848    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5849    region.put(put2);
5850
5851    // Create a reverse scan
5852    Scan scan = new Scan(Bytes.toBytes("199996"));
5853    scan.setReversed(true);
5854    RegionScannerImpl scanner = region.getScanner(scan);
5855
5856    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5857    for (int i = 100000; i < 200000; i++) {
5858      Put p = new Put(Bytes.toBytes("" + i));
5859      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5860      region.put(p);
5861    }
5862    List<Cell> currRow = new ArrayList<>();
5863    boolean hasNext;
5864    do {
5865      hasNext = scanner.next(currRow);
5866    } while (hasNext);
5867
5868    assertEquals(2, currRow.size());
5869    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5870      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5871    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5872      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5873  }
5874
5875  @Test
5876  public void testWriteRequestsCounter() throws IOException {
5877    byte[] fam = Bytes.toBytes("info");
5878    byte[][] families = { fam };
5879    this.region = initHRegion(tableName, method, CONF, families);
5880
5881    Assert.assertEquals(0L, region.getWriteRequestsCount());
5882
5883    Put put = new Put(row);
5884    put.addColumn(fam, fam, fam);
5885
5886    Assert.assertEquals(0L, region.getWriteRequestsCount());
5887    region.put(put);
5888    Assert.assertEquals(1L, region.getWriteRequestsCount());
5889    region.put(put);
5890    Assert.assertEquals(2L, region.getWriteRequestsCount());
5891    region.put(put);
5892    Assert.assertEquals(3L, region.getWriteRequestsCount());
5893
5894    region.delete(new Delete(row));
5895    Assert.assertEquals(4L, region.getWriteRequestsCount());
5896
5897    HBaseTestingUtility.closeRegionAndWAL(this.region);
5898    this.region = null;
5899  }
5900
5901  @Test
5902  public void testOpenRegionWrittenToWAL() throws Exception {
5903    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5904    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5905
5906    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5907    htd.addFamily(new HColumnDescriptor(fam1));
5908    htd.addFamily(new HColumnDescriptor(fam2));
5909
5910    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5911      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5912
5913    // open the region w/o rss and wal and flush some files
5914    HRegion region =
5915         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5916             .getConfiguration(), htd);
5917    assertNotNull(region);
5918
5919    // create a file in fam1 for the region before opening in OpenRegionHandler
5920    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5921    region.flush(true);
5922    HBaseTestingUtility.closeRegionAndWAL(region);
5923
5924    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5925
5926    // capture append() calls
5927    WAL wal = mockWAL();
5928    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5929
5930    try {
5931      region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5932        TEST_UTIL.getConfiguration(), rss, null);
5933
5934      verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any()
5935        , editCaptor.capture(), anyBoolean());
5936
5937      WALEdit edit = editCaptor.getValue();
5938      assertNotNull(edit);
5939      assertNotNull(edit.getCells());
5940      assertEquals(1, edit.getCells().size());
5941      RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5942      assertNotNull(desc);
5943
5944      LOG.info("RegionEventDescriptor from WAL: " + desc);
5945
5946      assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5947      assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5948      assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5949        hri.getEncodedNameAsBytes()));
5950      assertTrue(desc.getLogSequenceNumber() > 0);
5951      assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5952      assertEquals(2, desc.getStoresCount());
5953
5954      StoreDescriptor store = desc.getStores(0);
5955      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5956      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5957      assertEquals(1, store.getStoreFileCount()); // 1store file
5958      assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5959
5960      store = desc.getStores(1);
5961      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5962      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5963      assertEquals(0, store.getStoreFileCount()); // no store files
5964
5965    } finally {
5966      HBaseTestingUtility.closeRegionAndWAL(region);
5967    }
5968  }
5969
5970  // Helper for test testOpenRegionWrittenToWALForLogReplay
5971  static class HRegionWithSeqId extends HRegion {
5972    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5973        final Configuration confParam, final RegionInfo regionInfo,
5974        final TableDescriptor htd, final RegionServerServices rsServices) {
5975      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5976    }
5977    @Override
5978    protected long getNextSequenceId(WAL wal) throws IOException {
5979      return 42;
5980    }
5981  }
5982
5983  @Test
5984  public void testFlushedFileWithNoTags() throws Exception {
5985    final TableName tableName = TableName.valueOf(name.getMethodName());
5986    HTableDescriptor htd = new HTableDescriptor(tableName);
5987    htd.addFamily(new HColumnDescriptor(fam1));
5988    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5989    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5990    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5991    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5992    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5993    region.put(put);
5994    region.flush(true);
5995    HStore store = region.getStore(fam1);
5996    Collection<HStoreFile> storefiles = store.getStorefiles();
5997    for (HStoreFile sf : storefiles) {
5998      assertFalse("Tags should not be present "
5999          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
6000    }
6001  }
6002
6003  /**
6004   * Utility method to setup a WAL mock.
6005   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
6006   * @return
6007   * @throws IOException
6008   */
6009  private WAL mockWAL() throws IOException {
6010    WAL wal = mock(WAL.class);
6011    Mockito.when(wal.append((HRegionInfo)Mockito.any(),
6012        (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
6013      thenAnswer(new Answer<Long>() {
6014        @Override
6015        public Long answer(InvocationOnMock invocation) throws Throwable {
6016          WALKeyImpl key = invocation.getArgument(1);
6017          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6018          key.setWriteEntry(we);
6019          return 1L;
6020        }
6021
6022    });
6023    return wal;
6024  }
6025
6026  @Test
6027  public void testCloseRegionWrittenToWAL() throws Exception {
6028
6029    Path rootDir = new Path(dir + name.getMethodName());
6030    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
6031
6032    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6033    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6034
6035    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6036    htd.addFamily(new HColumnDescriptor(fam1));
6037    htd.addFamily(new HColumnDescriptor(fam2));
6038
6039    final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6040      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6041
6042    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6043
6044    // capture append() calls
6045    WAL wal = mockWAL();
6046    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6047
6048
6049    // create and then open a region first so that it can be closed later
6050    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
6051    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6052      TEST_UTIL.getConfiguration(), rss, null);
6053
6054    // close the region
6055    region.close(false);
6056
6057    // 2 times, one for region open, the other close region
6058    verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(),
6059      editCaptor.capture(), anyBoolean());
6060
6061    WALEdit edit = editCaptor.getAllValues().get(1);
6062    assertNotNull(edit);
6063    assertNotNull(edit.getCells());
6064    assertEquals(1, edit.getCells().size());
6065    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6066    assertNotNull(desc);
6067
6068    LOG.info("RegionEventDescriptor from WAL: " + desc);
6069
6070    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6071    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
6072    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6073      hri.getEncodedNameAsBytes()));
6074    assertTrue(desc.getLogSequenceNumber() > 0);
6075    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6076    assertEquals(2, desc.getStoresCount());
6077
6078    StoreDescriptor store = desc.getStores(0);
6079    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6080    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6081    assertEquals(0, store.getStoreFileCount()); // no store files
6082
6083    store = desc.getStores(1);
6084    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6085    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6086    assertEquals(0, store.getStoreFileCount()); // no store files
6087  }
6088
6089  /**
6090   * Test RegionTooBusyException thrown when region is busy
6091   */
6092  @Test
6093  public void testRegionTooBusy() throws IOException {
6094    byte[] family = Bytes.toBytes("family");
6095    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6096      HRegion.DEFAULT_BUSY_WAIT_DURATION);
6097    CONF.setLong("hbase.busy.wait.duration", 1000);
6098    region = initHRegion(tableName, method, CONF, family);
6099    final AtomicBoolean stopped = new AtomicBoolean(true);
6100    Thread t = new Thread(new Runnable() {
6101      @Override
6102      public void run() {
6103        try {
6104          region.lock.writeLock().lock();
6105          stopped.set(false);
6106          while (!stopped.get()) {
6107            Thread.sleep(100);
6108          }
6109        } catch (InterruptedException ie) {
6110        } finally {
6111          region.lock.writeLock().unlock();
6112        }
6113      }
6114    });
6115    t.start();
6116    Get get = new Get(row);
6117    try {
6118      while (stopped.get()) {
6119        Thread.sleep(100);
6120      }
6121      region.get(get);
6122      fail("Should throw RegionTooBusyException");
6123    } catch (InterruptedException ie) {
6124      fail("test interrupted");
6125    } catch (RegionTooBusyException e) {
6126      // Good, expected
6127    } finally {
6128      stopped.set(true);
6129      try {
6130        t.join();
6131      } catch (Throwable e) {
6132      }
6133
6134      HBaseTestingUtility.closeRegionAndWAL(region);
6135      region = null;
6136      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6137    }
6138  }
6139
6140  @Test
6141  public void testCellTTLs() throws IOException {
6142    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6143    EnvironmentEdgeManager.injectEdge(edge);
6144
6145    final byte[] row = Bytes.toBytes("testRow");
6146    final byte[] q1 = Bytes.toBytes("q1");
6147    final byte[] q2 = Bytes.toBytes("q2");
6148    final byte[] q3 = Bytes.toBytes("q3");
6149    final byte[] q4 = Bytes.toBytes("q4");
6150
6151    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6152    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
6153    hcd.setTimeToLive(10); // 10 seconds
6154    htd.addFamily(hcd);
6155
6156    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6157    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6158
6159    HRegion region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
6160            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
6161        TEST_UTIL.getDataTestDir(), conf, htd);
6162    assertNotNull(region);
6163    try {
6164      long now = EnvironmentEdgeManager.currentTime();
6165      // Add a cell that will expire in 5 seconds via cell TTL
6166      region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6167        HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6168          // TTL tags specify ts in milliseconds
6169          new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6170      // Add a cell that will expire after 10 seconds via family setting
6171      region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6172      // Add a cell that will expire in 15 seconds via cell TTL
6173      region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6174        HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6175          // TTL tags specify ts in milliseconds
6176          new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6177      // Add a cell that will expire in 20 seconds via family setting
6178      region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6179
6180      // Flush so we are sure store scanning gets this right
6181      region.flush(true);
6182
6183      // A query at time T+0 should return all cells
6184      Result r = region.get(new Get(row));
6185      assertNotNull(r.getValue(fam1, q1));
6186      assertNotNull(r.getValue(fam1, q2));
6187      assertNotNull(r.getValue(fam1, q3));
6188      assertNotNull(r.getValue(fam1, q4));
6189
6190      // Increment time to T+5 seconds
6191      edge.incrementTime(5000);
6192
6193      r = region.get(new Get(row));
6194      assertNull(r.getValue(fam1, q1));
6195      assertNotNull(r.getValue(fam1, q2));
6196      assertNotNull(r.getValue(fam1, q3));
6197      assertNotNull(r.getValue(fam1, q4));
6198
6199      // Increment time to T+10 seconds
6200      edge.incrementTime(5000);
6201
6202      r = region.get(new Get(row));
6203      assertNull(r.getValue(fam1, q1));
6204      assertNull(r.getValue(fam1, q2));
6205      assertNotNull(r.getValue(fam1, q3));
6206      assertNotNull(r.getValue(fam1, q4));
6207
6208      // Increment time to T+15 seconds
6209      edge.incrementTime(5000);
6210
6211      r = region.get(new Get(row));
6212      assertNull(r.getValue(fam1, q1));
6213      assertNull(r.getValue(fam1, q2));
6214      assertNull(r.getValue(fam1, q3));
6215      assertNotNull(r.getValue(fam1, q4));
6216
6217      // Increment time to T+20 seconds
6218      edge.incrementTime(10000);
6219
6220      r = region.get(new Get(row));
6221      assertNull(r.getValue(fam1, q1));
6222      assertNull(r.getValue(fam1, q2));
6223      assertNull(r.getValue(fam1, q3));
6224      assertNull(r.getValue(fam1, q4));
6225
6226      // Fun with disappearing increments
6227
6228      // Start at 1
6229      region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
6230      r = region.get(new Get(row));
6231      byte[] val = r.getValue(fam1, q1);
6232      assertNotNull(val);
6233      assertEquals(1L, Bytes.toLong(val));
6234
6235      // Increment with a TTL of 5 seconds
6236      Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6237      incr.setTTL(5000);
6238      region.increment(incr); // 2
6239
6240      // New value should be 2
6241      r = region.get(new Get(row));
6242      val = r.getValue(fam1, q1);
6243      assertNotNull(val);
6244      assertEquals(2L, Bytes.toLong(val));
6245
6246      // Increment time to T+25 seconds
6247      edge.incrementTime(5000);
6248
6249      // Value should be back to 1
6250      r = region.get(new Get(row));
6251      val = r.getValue(fam1, q1);
6252      assertNotNull(val);
6253      assertEquals(1L, Bytes.toLong(val));
6254
6255      // Increment time to T+30 seconds
6256      edge.incrementTime(5000);
6257
6258      // Original value written at T+20 should be gone now via family TTL
6259      r = region.get(new Get(row));
6260      assertNull(r.getValue(fam1, q1));
6261
6262    } finally {
6263      HBaseTestingUtility.closeRegionAndWAL(region);
6264    }
6265  }
6266
6267  @Test
6268  public void testIncrementTimestampsAreMonotonic() throws IOException {
6269    HRegion region = initHRegion(tableName, method, CONF, fam1);
6270    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6271    EnvironmentEdgeManager.injectEdge(edge);
6272
6273    edge.setValue(10);
6274    Increment inc = new Increment(row);
6275    inc.setDurability(Durability.SKIP_WAL);
6276    inc.addColumn(fam1, qual1, 1L);
6277    region.increment(inc);
6278
6279    Result result = region.get(new Get(row));
6280    Cell c = result.getColumnLatestCell(fam1, qual1);
6281    assertNotNull(c);
6282    assertEquals(10L, c.getTimestamp());
6283
6284    edge.setValue(1); // clock goes back
6285    region.increment(inc);
6286    result = region.get(new Get(row));
6287    c = result.getColumnLatestCell(fam1, qual1);
6288    assertEquals(11L, c.getTimestamp());
6289    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6290  }
6291
6292  @Test
6293  public void testAppendTimestampsAreMonotonic() throws IOException {
6294    HRegion region = initHRegion(tableName, method, CONF, fam1);
6295    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6296    EnvironmentEdgeManager.injectEdge(edge);
6297
6298    edge.setValue(10);
6299    Append a = new Append(row);
6300    a.setDurability(Durability.SKIP_WAL);
6301    a.addColumn(fam1, qual1, qual1);
6302    region.append(a);
6303
6304    Result result = region.get(new Get(row));
6305    Cell c = result.getColumnLatestCell(fam1, qual1);
6306    assertNotNull(c);
6307    assertEquals(10L, c.getTimestamp());
6308
6309    edge.setValue(1); // clock goes back
6310    region.append(a);
6311    result = region.get(new Get(row));
6312    c = result.getColumnLatestCell(fam1, qual1);
6313    assertEquals(11L, c.getTimestamp());
6314
6315    byte[] expected = new byte[qual1.length*2];
6316    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6317    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6318
6319    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6320      expected, 0, expected.length));
6321  }
6322
6323  @Test
6324  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6325    HRegion region = initHRegion(tableName, method, CONF, fam1);
6326    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6327    EnvironmentEdgeManager.injectEdge(edge);
6328
6329    edge.setValue(10);
6330    Put p = new Put(row);
6331    p.setDurability(Durability.SKIP_WAL);
6332    p.addColumn(fam1, qual1, qual1);
6333    region.put(p);
6334
6335    Result result = region.get(new Get(row));
6336    Cell c = result.getColumnLatestCell(fam1, qual1);
6337    assertNotNull(c);
6338    assertEquals(10L, c.getTimestamp());
6339
6340    edge.setValue(1); // clock goes back
6341    p = new Put(row);
6342    p.setDurability(Durability.SKIP_WAL);
6343    p.addColumn(fam1, qual1, qual2);
6344    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6345    result = region.get(new Get(row));
6346    c = result.getColumnLatestCell(fam1, qual1);
6347    assertEquals(10L, c.getTimestamp());
6348
6349    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6350      qual2, 0, qual2.length));
6351  }
6352
6353  @Test
6354  public void testBatchMutateWithWrongRegionException() throws Exception {
6355    final byte[] a = Bytes.toBytes("a");
6356    final byte[] b = Bytes.toBytes("b");
6357    final byte[] c = Bytes.toBytes("c"); // exclusive
6358
6359    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6360    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6361    final HRegion region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6362
6363    Mutation[] mutations = new Mutation[] {
6364        new Put(a)
6365            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6366              .setRow(a)
6367              .setFamily(fam1)
6368              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6369              .setType(Cell.Type.Put)
6370              .build()),
6371        // this is outside the region boundary
6372        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6373              .setRow(c)
6374              .setFamily(fam1)
6375              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6376              .setType(Type.Put)
6377              .build()),
6378        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6379              .setRow(b)
6380              .setFamily(fam1)
6381              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6382              .setType(Cell.Type.Put)
6383              .build())
6384    };
6385
6386    OperationStatus[] status = region.batchMutate(mutations);
6387    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6388    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6389    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6390
6391
6392    // test with a row lock held for a long time
6393    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6394    ExecutorService exec = Executors.newFixedThreadPool(2);
6395    Future<Void> f1 = exec.submit(new Callable<Void>() {
6396      @Override
6397      public Void call() throws Exception {
6398        LOG.info("Acquiring row lock");
6399        RowLock rl = region.getRowLock(b);
6400        obtainedRowLock.countDown();
6401        LOG.info("Waiting for 5 seconds before releasing lock");
6402        Threads.sleep(5000);
6403        LOG.info("Releasing row lock");
6404        rl.release();
6405        return null;
6406      }
6407    });
6408    obtainedRowLock.await(30, TimeUnit.SECONDS);
6409
6410    Future<Void> f2 = exec.submit(new Callable<Void>() {
6411      @Override
6412      public Void call() throws Exception {
6413        Mutation[] mutations = new Mutation[] {
6414            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6415                .setRow(a)
6416                .setFamily(fam1)
6417                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6418                .setType(Cell.Type.Put)
6419                .build()),
6420            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6421                .setRow(b)
6422                .setFamily(fam1)
6423                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6424                .setType(Cell.Type.Put)
6425                .build()),
6426        };
6427
6428        // this will wait for the row lock, and it will eventually succeed
6429        OperationStatus[] status = region.batchMutate(mutations);
6430        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6431        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6432        return null;
6433      }
6434    });
6435
6436    f1.get();
6437    f2.get();
6438
6439    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6440  }
6441
6442  @Test
6443  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6444    HRegion region = initHRegion(tableName, method, CONF, fam1);
6445    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6446    EnvironmentEdgeManager.injectEdge(edge);
6447
6448    edge.setValue(10);
6449    Put p = new Put(row);
6450    p.setDurability(Durability.SKIP_WAL);
6451    p.addColumn(fam1, qual1, qual1);
6452    region.put(p);
6453
6454    Result result = region.get(new Get(row));
6455    Cell c = result.getColumnLatestCell(fam1, qual1);
6456    assertNotNull(c);
6457    assertEquals(10L, c.getTimestamp());
6458
6459    edge.setValue(1); // clock goes back
6460    p = new Put(row);
6461    p.setDurability(Durability.SKIP_WAL);
6462    p.addColumn(fam1, qual1, qual2);
6463    RowMutations rm = new RowMutations(row);
6464    rm.add(p);
6465    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6466        new BinaryComparator(qual1), rm));
6467    result = region.get(new Get(row));
6468    c = result.getColumnLatestCell(fam1, qual1);
6469    assertEquals(10L, c.getTimestamp());
6470    LOG.info("c value " +
6471      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6472
6473    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6474      qual2, 0, qual2.length));
6475  }
6476
6477  HRegion initHRegion(TableName tableName, String callingMethod,
6478      byte[]... families) throws IOException {
6479    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6480        families);
6481  }
6482
6483  /**
6484   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6485   * @throws IOException if IO error occurred during test
6486   */
6487  @Test
6488  public void testWritesWhileRollWriter() throws IOException {
6489    int testCount = 10;
6490    int numRows = 1024;
6491    int numFamilies = 2;
6492    int numQualifiers = 2;
6493    final byte[][] families = new byte[numFamilies][];
6494    for (int i = 0; i < numFamilies; i++) {
6495      families[i] = Bytes.toBytes("family" + i);
6496    }
6497    final byte[][] qualifiers = new byte[numQualifiers][];
6498    for (int i = 0; i < numQualifiers; i++) {
6499      qualifiers[i] = Bytes.toBytes("qual" + i);
6500    }
6501
6502    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6503    this.region = initHRegion(tableName, method, CONF, families);
6504    try {
6505      List<Thread> threads = new ArrayList<>();
6506      for (int i = 0; i < numRows; i++) {
6507        final int count = i;
6508        Thread t = new Thread(new Runnable() {
6509
6510          @Override
6511          public void run() {
6512            byte[] row = Bytes.toBytes("row" + count);
6513            Put put = new Put(row);
6514            put.setDurability(Durability.SYNC_WAL);
6515            byte[] value = Bytes.toBytes(String.valueOf(count));
6516            for (byte[] family : families) {
6517              for (byte[] qualifier : qualifiers) {
6518                put.addColumn(family, qualifier, count, value);
6519              }
6520            }
6521            try {
6522              region.put(put);
6523            } catch (IOException e) {
6524              throw new RuntimeException(e);
6525            }
6526          }
6527        });
6528        threads.add(t);
6529      }
6530      for (Thread t : threads) {
6531        t.start();
6532      }
6533
6534      for (int i = 0; i < testCount; i++) {
6535        region.getWAL().rollWriter();
6536        Thread.yield();
6537      }
6538    } finally {
6539      try {
6540        HBaseTestingUtility.closeRegionAndWAL(this.region);
6541        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6542      } catch (DroppedSnapshotException dse) {
6543        // We could get this on way out because we interrupt the background flusher and it could
6544        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6545        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6546        // flush because the accounting is 'off' since original DSE.
6547      }
6548      this.region = null;
6549    }
6550  }
6551
6552  @Test
6553  public void testMutateRow_WriteRequestCount() throws Exception {
6554    byte[] row1 = Bytes.toBytes("row1");
6555    byte[] fam1 = Bytes.toBytes("fam1");
6556    byte[] qf1 = Bytes.toBytes("qualifier");
6557    byte[] val1 = Bytes.toBytes("value1");
6558
6559    RowMutations rm = new RowMutations(row1);
6560    Put put = new Put(row1);
6561    put.addColumn(fam1, qf1, val1);
6562    rm.add(put);
6563
6564    this.region = initHRegion(tableName, method, CONF, fam1);
6565    try {
6566      long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6567      this.region.mutateRow(rm);
6568      long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6569      Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6570    } finally {
6571      HBaseTestingUtility.closeRegionAndWAL(this.region);
6572      this.region = null;
6573    }
6574  }
6575
6576  @Test
6577  public void testBulkLoadReplicationEnabled() throws IOException {
6578    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6579    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6580    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6581
6582    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6583    htd.addFamily(new HColumnDescriptor(fam1));
6584    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6585        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6586    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6587        rss, null);
6588
6589    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6590    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6591    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6592    assertTrue(plugins.contains(replicationCoprocessorClass));
6593    assertTrue(region.getCoprocessorHost().
6594        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6595
6596    region.close();
6597  }
6598
6599  /**
6600   * The same as HRegion class, the only difference is that instantiateHStore will
6601   * create a different HStore - HStoreForTesting. [HBASE-8518]
6602   */
6603  public static class HRegionForTesting extends HRegion {
6604
6605    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6606                             final Configuration confParam, final RegionInfo regionInfo,
6607                             final TableDescriptor htd, final RegionServerServices rsServices) {
6608      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6609          wal, confParam, htd, rsServices);
6610    }
6611
6612    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6613                             Configuration confParam, TableDescriptor htd,
6614                             RegionServerServices rsServices) {
6615      super(fs, wal, confParam, htd, rsServices);
6616    }
6617
6618    /**
6619     * Create HStore instance.
6620     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6621     */
6622    @Override
6623    protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
6624      if (family.isMobEnabled()) {
6625        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6626          throw new IOException("A minimum HFile version of "
6627              + HFile.MIN_FORMAT_VERSION_WITH_TAGS
6628              + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
6629              + " accordingly.");
6630        }
6631        return new HMobStore(this, family, this.conf);
6632      }
6633      return new HStoreForTesting(this, family, this.conf);
6634    }
6635  }
6636
6637  /**
6638   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6639   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6640   * doesn't let hstore compaction complete. In the former edition, this config is set in
6641   * HStore class inside compact method, though this is just for testing, otherwise it
6642   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6643   * config (except for testing code).
6644   */
6645  public static class HStoreForTesting extends HStore {
6646
6647    protected HStoreForTesting(final HRegion region,
6648        final ColumnFamilyDescriptor family,
6649        final Configuration confParam) throws IOException {
6650      super(region, family, confParam);
6651    }
6652
6653    @Override
6654    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6655        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6656        List<Path> newFiles) throws IOException {
6657      // let compaction incomplete.
6658      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6659        LOG.warn("hbase.hstore.compaction.complete is set to false");
6660        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6661        final boolean evictOnClose =
6662            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6663        for (Path newFile : newFiles) {
6664          // Create storefile around what we wrote with a reader on it.
6665          HStoreFile sf = createStoreFileAndReader(newFile);
6666          sf.closeStoreFile(evictOnClose);
6667          sfs.add(sf);
6668        }
6669        return sfs;
6670      }
6671      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6672    }
6673  }
6674}