001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyLong;
033import static org.mockito.Mockito.doThrow;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.never;
036import static org.mockito.Mockito.spy;
037import static org.mockito.Mockito.times;
038import static org.mockito.Mockito.verify;
039import static org.mockito.Mockito.when;
040
041import java.io.IOException;
042import java.io.InterruptedIOException;
043import java.math.BigDecimal;
044import java.nio.charset.StandardCharsets;
045import java.security.PrivilegedExceptionAction;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.List;
050import java.util.Map;
051import java.util.NavigableMap;
052import java.util.Objects;
053import java.util.TreeMap;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.atomic.AtomicBoolean;
061import java.util.concurrent.atomic.AtomicInteger;
062import java.util.concurrent.atomic.AtomicReference;
063import org.apache.commons.lang3.RandomStringUtils;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.fs.FSDataOutputStream;
066import org.apache.hadoop.fs.FileStatus;
067import org.apache.hadoop.fs.FileSystem;
068import org.apache.hadoop.fs.Path;
069import org.apache.hadoop.hbase.ArrayBackedTag;
070import org.apache.hadoop.hbase.Cell;
071import org.apache.hadoop.hbase.Cell.Type;
072import org.apache.hadoop.hbase.CellBuilderFactory;
073import org.apache.hadoop.hbase.CellBuilderType;
074import org.apache.hadoop.hbase.CellUtil;
075import org.apache.hadoop.hbase.CompareOperator;
076import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
077import org.apache.hadoop.hbase.DoNotRetryIOException;
078import org.apache.hadoop.hbase.DroppedSnapshotException;
079import org.apache.hadoop.hbase.HBaseClassTestRule;
080import org.apache.hadoop.hbase.HBaseConfiguration;
081import org.apache.hadoop.hbase.HBaseTestingUtility;
082import org.apache.hadoop.hbase.HColumnDescriptor;
083import org.apache.hadoop.hbase.HConstants;
084import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
085import org.apache.hadoop.hbase.HDFSBlocksDistribution;
086import org.apache.hadoop.hbase.HRegionInfo;
087import org.apache.hadoop.hbase.HTableDescriptor;
088import org.apache.hadoop.hbase.KeyValue;
089import org.apache.hadoop.hbase.MiniHBaseCluster;
090import org.apache.hadoop.hbase.MultithreadedTestUtil;
091import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
092import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
093import org.apache.hadoop.hbase.NotServingRegionException;
094import org.apache.hadoop.hbase.PrivateCellUtil;
095import org.apache.hadoop.hbase.RegionTooBusyException;
096import org.apache.hadoop.hbase.ServerName;
097import org.apache.hadoop.hbase.StartMiniClusterOption;
098import org.apache.hadoop.hbase.TableName;
099import org.apache.hadoop.hbase.TagType;
100import org.apache.hadoop.hbase.Waiter;
101import org.apache.hadoop.hbase.client.Append;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
103import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
104import org.apache.hadoop.hbase.client.Delete;
105import org.apache.hadoop.hbase.client.Durability;
106import org.apache.hadoop.hbase.client.Get;
107import org.apache.hadoop.hbase.client.Increment;
108import org.apache.hadoop.hbase.client.Mutation;
109import org.apache.hadoop.hbase.client.Put;
110import org.apache.hadoop.hbase.client.RegionInfo;
111import org.apache.hadoop.hbase.client.RegionInfoBuilder;
112import org.apache.hadoop.hbase.client.Result;
113import org.apache.hadoop.hbase.client.RowMutations;
114import org.apache.hadoop.hbase.client.Scan;
115import org.apache.hadoop.hbase.client.Table;
116import org.apache.hadoop.hbase.client.TableDescriptor;
117import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
118import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
119import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
120import org.apache.hadoop.hbase.filter.BigDecimalComparator;
121import org.apache.hadoop.hbase.filter.BinaryComparator;
122import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
123import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
124import org.apache.hadoop.hbase.filter.Filter;
125import org.apache.hadoop.hbase.filter.FilterBase;
126import org.apache.hadoop.hbase.filter.FilterList;
127import org.apache.hadoop.hbase.filter.NullComparator;
128import org.apache.hadoop.hbase.filter.PrefixFilter;
129import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
130import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
131import org.apache.hadoop.hbase.filter.SubstringComparator;
132import org.apache.hadoop.hbase.filter.ValueFilter;
133import org.apache.hadoop.hbase.io.TimeRange;
134import org.apache.hadoop.hbase.io.hfile.HFile;
135import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
136import org.apache.hadoop.hbase.monitoring.MonitoredTask;
137import org.apache.hadoop.hbase.monitoring.TaskMonitor;
138import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
139import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
140import org.apache.hadoop.hbase.regionserver.Region.RowLock;
141import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
142import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
143import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
144import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
145import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
146import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
147import org.apache.hadoop.hbase.security.User;
148import org.apache.hadoop.hbase.test.MetricsAssertHelper;
149import org.apache.hadoop.hbase.testclassification.LargeTests;
150import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
151import org.apache.hadoop.hbase.util.Bytes;
152import org.apache.hadoop.hbase.util.CommonFSUtils;
153import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
154import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
155import org.apache.hadoop.hbase.util.HFileArchiveUtil;
156import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
157import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
158import org.apache.hadoop.hbase.util.Threads;
159import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
160import org.apache.hadoop.hbase.wal.FaultyFSLog;
161import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
162import org.apache.hadoop.hbase.wal.WAL;
163import org.apache.hadoop.hbase.wal.WALEdit;
164import org.apache.hadoop.hbase.wal.WALFactory;
165import org.apache.hadoop.hbase.wal.WALKeyImpl;
166import org.apache.hadoop.hbase.wal.WALProvider;
167import org.apache.hadoop.hbase.wal.WALProvider.Writer;
168import org.apache.hadoop.hbase.wal.WALSplitUtil;
169import org.junit.After;
170import org.junit.Assert;
171import org.junit.Before;
172import org.junit.ClassRule;
173import org.junit.Rule;
174import org.junit.Test;
175import org.junit.experimental.categories.Category;
176import org.junit.rules.ExpectedException;
177import org.junit.rules.TestName;
178import org.mockito.ArgumentCaptor;
179import org.mockito.ArgumentMatcher;
180import org.mockito.Mockito;
181import org.mockito.invocation.InvocationOnMock;
182import org.mockito.stubbing.Answer;
183import org.slf4j.Logger;
184import org.slf4j.LoggerFactory;
185
186import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
187import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
188import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
189import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
190import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
191
192import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
199
200/**
201 * Basic stand-alone testing of HRegion.  No clusters!
202 *
203 * A lot of the meta information for an HRegion now lives inside other HRegions
204 * or in the HBaseMaster, so only basic testing is possible.
205 */
206@Category({VerySlowRegionServerTests.class, LargeTests.class})
207@SuppressWarnings("deprecation")
208public class TestHRegion {
209
210  @ClassRule
211  public static final HBaseClassTestRule CLASS_RULE =
212      HBaseClassTestRule.forClass(TestHRegion.class);
213
214  // Do not spin up clusters in here. If you need to spin up a cluster, do it
215  // over in TestHRegionOnCluster.
216  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
217  @Rule
218  public TestName name = new TestName();
219  @Rule public final ExpectedException thrown = ExpectedException.none();
220
221  private static final String COLUMN_FAMILY = "MyCF";
222  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
223  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
224
225  HRegion region = null;
226  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
227  protected static HBaseTestingUtility TEST_UTIL;
228  public static Configuration CONF ;
229  private String dir;
230  private static FileSystem FILESYSTEM;
231  private final int MAX_VERSIONS = 2;
232
233  // Test names
234  protected TableName tableName;
235  protected String method;
236  protected final byte[] qual = Bytes.toBytes("qual");
237  protected final byte[] qual1 = Bytes.toBytes("qual1");
238  protected final byte[] qual2 = Bytes.toBytes("qual2");
239  protected final byte[] qual3 = Bytes.toBytes("qual3");
240  protected final byte[] value = Bytes.toBytes("value");
241  protected final byte[] value1 = Bytes.toBytes("value1");
242  protected final byte[] value2 = Bytes.toBytes("value2");
243  protected final byte[] row = Bytes.toBytes("rowA");
244  protected final byte[] row2 = Bytes.toBytes("rowB");
245
246  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
247      .getInstance(MetricsAssertHelper.class);
248
249  @Before
250  public void setup() throws IOException {
251    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
252    FILESYSTEM = TEST_UTIL.getTestFileSystem();
253    CONF = TEST_UTIL.getConfiguration();
254    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
255    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
256    method = name.getMethodName();
257    tableName = TableName.valueOf(method);
258    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
259  }
260
261  @After
262  public void tearDown() throws IOException {
263    // Region may have been closed, but it is still no harm if we close it again here using HTU.
264    HBaseTestingUtility.closeRegionAndWAL(region);
265    EnvironmentEdgeManagerTestHelper.reset();
266    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
267    TEST_UTIL.cleanupTestDir();
268  }
269
270  /**
271   * Test that I can use the max flushed sequence id after the close.
272   * @throws IOException
273   */
274  @Test
275  public void testSequenceId() throws IOException {
276    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
277    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
278    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
279    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
280    HBaseTestingUtility.closeRegionAndWAL(this.region);
281    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
282    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
283    // Open region again.
284    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
285    byte [] value = Bytes.toBytes(method);
286    // Make a random put against our cf.
287    Put put = new Put(value);
288    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
289    region.put(put);
290    // No flush yet so init numbers should still be in place.
291    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
292    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
293    region.flush(true);
294    long max = region.getMaxFlushedSeqId();
295    HBaseTestingUtility.closeRegionAndWAL(this.region);
296    assertEquals(max, region.getMaxFlushedSeqId());
297    this.region = null;
298  }
299
300  /**
301   * Test for Bug 2 of HBASE-10466.
302   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
303   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
304   * flush is skipped and only the second flush takes place. However, two flushes are required in
305   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
306   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
307   * flushes for region close."
308   * @throws IOException
309   */
310  @Test
311  public void testCloseCarryingSnapshot() throws IOException {
312    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
313    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
314    // Get some random bytes.
315    byte [] value = Bytes.toBytes(method);
316    // Make a random put against our cf.
317    Put put = new Put(value);
318    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
319    // First put something in current memstore, which will be in snapshot after flusher.prepare()
320    region.put(put);
321    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
322    storeFlushCtx.prepare();
323    // Second put something in current memstore
324    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
325    region.put(put);
326    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
327    HBaseTestingUtility.closeRegionAndWAL(region);
328    assertEquals(0, region.getMemStoreDataSize());
329    region = null;
330  }
331
332  /*
333   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
334   * See HBASE-10845
335   */
336  @Test
337  public void testMemstoreSnapshotSize() throws IOException {
338    class MyFaultyFSLog extends FaultyFSLog {
339      StoreFlushContext storeFlushCtx;
340      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
341          throws IOException {
342        super(fs, rootDir, logName, conf);
343      }
344
345      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
346        this.storeFlushCtx = storeFlushCtx;
347      }
348
349      @Override
350      public void sync(long txid) throws IOException {
351        storeFlushCtx.prepare();
352        super.sync(txid);
353      }
354    }
355
356    FileSystem fs = FileSystem.get(CONF);
357    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
358    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
359    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
360        COLUMN_FAMILY_BYTES);
361
362    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
363    // Get some random bytes.
364    byte [] value = Bytes.toBytes(method);
365    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
366
367    Put put = new Put(value);
368    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
369    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
370
371    boolean threwIOE = false;
372    try {
373      region.put(put);
374    } catch (IOException ioe) {
375      threwIOE = true;
376    } finally {
377      assertTrue("The regionserver should have thrown an exception", threwIOE);
378    }
379    MemStoreSize mss = store.getFlushableSize();
380    assertTrue("flushable size should be zero, but it is " + mss,
381        mss.getDataSize() == 0);
382  }
383
384  /**
385   * Create a WAL outside of the usual helper in
386   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
387   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
388   * {@code fs.file.impl}.
389   * @param callingMethod a unique component for the path, probably the name of the test method.
390   */
391  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
392      Configuration conf, TableName tableName) throws IOException {
393    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
394    final Configuration walConf = new Configuration(conf);
395    CommonFSUtils.setRootDir(walConf, logDir);
396    return new WALFactory(walConf, callingMethod)
397        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
398  }
399
400  @Test
401  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
402    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
403    FileSystem fs = FileSystem.get(CONF);
404    Path rootDir = new Path(dir + testName);
405    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
406    hLog.init();
407    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
408        COLUMN_FAMILY_BYTES);
409    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
410    assertEquals(0, region.getMemStoreDataSize());
411
412    // Put one value
413    byte [] value = Bytes.toBytes(method);
414    Put put = new Put(value);
415    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
416    region.put(put);
417    long onePutSize = region.getMemStoreDataSize();
418    assertTrue(onePutSize > 0);
419
420    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
421    doThrow(new IOException())
422       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
423    region.setCoprocessorHost(mockedCPHost);
424
425    put = new Put(value);
426    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
427    try {
428      region.put(put);
429      fail("Should have failed with IOException");
430    } catch (IOException expected) {
431    }
432    long expectedSize = onePutSize * 2;
433    assertEquals("memstoreSize should be incremented",
434        expectedSize, region.getMemStoreDataSize());
435    assertEquals("flushable size should be incremented",
436        expectedSize, store.getFlushableSize().getDataSize());
437
438    region.setCoprocessorHost(null);
439  }
440
441  /**
442   * A test case of HBASE-21041
443   * @throws Exception Exception
444   */
445  @Test
446  public void testFlushAndMemstoreSizeCounting() throws Exception {
447    byte[] family = Bytes.toBytes("family");
448    this.region = initHRegion(tableName, method, CONF, family);
449    final WALFactory wals = new WALFactory(CONF, method);
450    try {
451      for (byte[] row : HBaseTestingUtility.ROWS) {
452        Put put = new Put(row);
453        put.addColumn(family, family, row);
454        region.put(put);
455      }
456      region.flush(true);
457      // After flush, data size should be zero
458      assertEquals(0, region.getMemStoreDataSize());
459      // After flush, a new active mutable segment is created, so the heap size
460      // should equal to MutableSegment.DEEP_OVERHEAD
461      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
462      // After flush, offheap should be zero
463      assertEquals(0, region.getMemStoreOffHeapSize());
464    } finally {
465      HBaseTestingUtility.closeRegionAndWAL(this.region);
466      this.region = null;
467      wals.close();
468    }
469  }
470
471  /**
472   * Test we do not lose data if we fail a flush and then close.
473   * Part of HBase-10466.  Tests the following from the issue description:
474   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
475   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
476   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
477   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
478   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
479   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
480   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
481   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
482   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
483   * if memstoreSize is not larger than 0."
484   * @throws Exception
485   */
486  @Test
487  public void testFlushSizeAccounting() throws Exception {
488    final Configuration conf = HBaseConfiguration.create(CONF);
489    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
490    // Only retry once.
491    conf.setInt("hbase.hstore.flush.retries.number", 1);
492    final User user =
493      User.createUserForTesting(conf, method, new String[]{"foo"});
494    // Inject our faulty LocalFileSystem
495    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
496    user.runAs(new PrivilegedExceptionAction<Object>() {
497      @Override
498      public Object run() throws Exception {
499        // Make sure it worked (above is sensitive to caching details in hadoop core)
500        FileSystem fs = FileSystem.get(conf);
501        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
502        FaultyFileSystem ffs = (FaultyFileSystem)fs;
503        HRegion region = null;
504        try {
505          // Initialize region
506          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
507              COLUMN_FAMILY_BYTES);
508          long size = region.getMemStoreDataSize();
509          Assert.assertEquals(0, size);
510          // Put one item into memstore.  Measure the size of one item in memstore.
511          Put p1 = new Put(row);
512          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
513          region.put(p1);
514          final long sizeOfOnePut = region.getMemStoreDataSize();
515          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
516          try {
517            LOG.info("Flushing");
518            region.flush(true);
519            Assert.fail("Didn't bubble up IOE!");
520          } catch (DroppedSnapshotException dse) {
521            // What we are expecting
522            region.closing.set(false); // this is needed for the rest of the test to work
523          }
524          // Make it so all writes succeed from here on out
525          ffs.fault.set(false);
526          // Check sizes.  Should still be the one entry.
527          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
528          // Now add two entries so that on this next flush that fails, we can see if we
529          // subtract the right amount, the snapshot size only.
530          Put p2 = new Put(row);
531          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
532          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
533          region.put(p2);
534          long expectedSize = sizeOfOnePut * 3;
535          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
536          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
537          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
538          // it
539          region.flush(true);
540          // Make sure our memory accounting is right.
541          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
542        } finally {
543          HBaseTestingUtility.closeRegionAndWAL(region);
544        }
545        return null;
546      }
547    });
548    FileSystem.closeAllForUGI(user.getUGI());
549  }
550
551  @Test
552  public void testCloseWithFailingFlush() throws Exception {
553    final Configuration conf = HBaseConfiguration.create(CONF);
554    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
555    // Only retry once.
556    conf.setInt("hbase.hstore.flush.retries.number", 1);
557    final User user =
558      User.createUserForTesting(conf, this.method, new String[]{"foo"});
559    // Inject our faulty LocalFileSystem
560    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
561    user.runAs(new PrivilegedExceptionAction<Object>() {
562      @Override
563      public Object run() throws Exception {
564        // Make sure it worked (above is sensitive to caching details in hadoop core)
565        FileSystem fs = FileSystem.get(conf);
566        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
567        FaultyFileSystem ffs = (FaultyFileSystem)fs;
568        HRegion region = null;
569        try {
570          // Initialize region
571          region = initHRegion(tableName, null, null, false,
572              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
573          long size = region.getMemStoreDataSize();
574          Assert.assertEquals(0, size);
575          // Put one item into memstore.  Measure the size of one item in memstore.
576          Put p1 = new Put(row);
577          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
578          region.put(p1);
579          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
580          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
581          StoreFlushContext storeFlushCtx =
582              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
583          storeFlushCtx.prepare();
584          // Now add two entries to the foreground memstore.
585          Put p2 = new Put(row);
586          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
587          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
588          region.put(p2);
589          // Now try close on top of a failing flush.
590          HBaseTestingUtility.closeRegionAndWAL(region);
591          region = null;
592          fail();
593        } catch (DroppedSnapshotException dse) {
594          // Expected
595          LOG.info("Expected DroppedSnapshotException");
596        } finally {
597          // Make it so all writes succeed from here on out so can close clean
598          ffs.fault.set(false);
599          HBaseTestingUtility.closeRegionAndWAL(region);
600        }
601        return null;
602      }
603    });
604    FileSystem.closeAllForUGI(user.getUGI());
605  }
606
607  @Test
608  public void testCompactionAffectedByScanners() throws Exception {
609    byte[] family = Bytes.toBytes("family");
610    this.region = initHRegion(tableName, method, CONF, family);
611
612    Put put = new Put(Bytes.toBytes("r1"));
613    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
614    region.put(put);
615    region.flush(true);
616
617    Scan scan = new Scan();
618    scan.setMaxVersions(3);
619    // open the first scanner
620    RegionScanner scanner1 = region.getScanner(scan);
621
622    Delete delete = new Delete(Bytes.toBytes("r1"));
623    region.delete(delete);
624    region.flush(true);
625
626    // open the second scanner
627    RegionScanner scanner2 = region.getScanner(scan);
628
629    List<Cell> results = new ArrayList<>();
630
631    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
632
633    // make a major compaction
634    region.compact(true);
635
636    // open the third scanner
637    RegionScanner scanner3 = region.getScanner(scan);
638
639    // get data from scanner 1, 2, 3 after major compaction
640    scanner1.next(results);
641    System.out.println(results);
642    assertEquals(1, results.size());
643
644    results.clear();
645    scanner2.next(results);
646    System.out.println(results);
647    assertEquals(0, results.size());
648
649    results.clear();
650    scanner3.next(results);
651    System.out.println(results);
652    assertEquals(0, results.size());
653  }
654
655  @Test
656  public void testToShowNPEOnRegionScannerReseek() throws Exception {
657    byte[] family = Bytes.toBytes("family");
658    this.region = initHRegion(tableName, method, CONF, family);
659
660    Put put = new Put(Bytes.toBytes("r1"));
661    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
662    region.put(put);
663    put = new Put(Bytes.toBytes("r2"));
664    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
665    region.put(put);
666    region.flush(true);
667
668    Scan scan = new Scan();
669    scan.setMaxVersions(3);
670    // open the first scanner
671    RegionScanner scanner1 = region.getScanner(scan);
672
673    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
674
675    region.compact(true);
676
677    scanner1.reseek(Bytes.toBytes("r2"));
678    List<Cell> results = new ArrayList<>();
679    scanner1.next(results);
680    Cell keyValue = results.get(0);
681    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
682    scanner1.close();
683  }
684
685  @Test
686  public void testArchiveRecoveredEditsReplay() throws Exception {
687    byte[] family = Bytes.toBytes("family");
688    this.region = initHRegion(tableName, method, CONF, family);
689    final WALFactory wals = new WALFactory(CONF, method);
690    try {
691      Path regiondir = region.getRegionFileSystem().getRegionDir();
692      FileSystem fs = region.getRegionFileSystem().getFileSystem();
693      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
694
695      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
696
697      long maxSeqId = 1050;
698      long minSeqId = 1000;
699
700      for (long i = minSeqId; i <= maxSeqId; i += 10) {
701        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
702        fs.create(recoveredEdits);
703        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
704
705        long time = System.nanoTime();
706        WALEdit edit = new WALEdit();
707        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
708          .toBytes(i)));
709        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
710          HConstants.DEFAULT_CLUSTER_ID), edit));
711
712        writer.close();
713      }
714      MonitoredTask status = TaskMonitor.get().createStatus(method);
715      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
716      for (HStore store : region.getStores()) {
717        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
718      }
719      CONF.set("hbase.region.archive.recovered.edits", "true");
720      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
721      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
722      assertEquals(maxSeqId, seqId);
723      region.getMVCC().advanceTo(seqId);
724      String fakeFamilyName = recoveredEditsDir.getName();
725      Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
726      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
727        region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
728      FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
729      assertEquals(6, list.length);
730    } finally {
731      CONF.set("hbase.region.archive.recovered.edits", "false");
732      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
733      HBaseTestingUtility.closeRegionAndWAL(this.region);
734      this.region = null;
735      wals.close();
736    }
737  }
738
739  @Test
740  public void testSkipRecoveredEditsReplay() throws Exception {
741    byte[] family = Bytes.toBytes("family");
742    this.region = initHRegion(tableName, method, CONF, family);
743    final WALFactory wals = new WALFactory(CONF, method);
744    try {
745      Path regiondir = region.getRegionFileSystem().getRegionDir();
746      FileSystem fs = region.getRegionFileSystem().getFileSystem();
747      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
748
749      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
750
751      long maxSeqId = 1050;
752      long minSeqId = 1000;
753
754      for (long i = minSeqId; i <= maxSeqId; i += 10) {
755        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
756        fs.create(recoveredEdits);
757        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
758
759        long time = System.nanoTime();
760        WALEdit edit = new WALEdit();
761        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
762            .toBytes(i)));
763        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
764            HConstants.DEFAULT_CLUSTER_ID), edit));
765
766        writer.close();
767      }
768      MonitoredTask status = TaskMonitor.get().createStatus(method);
769      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
770      for (HStore store : region.getStores()) {
771        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
772      }
773      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
774      assertEquals(maxSeqId, seqId);
775      region.getMVCC().advanceTo(seqId);
776      Get get = new Get(row);
777      Result result = region.get(get);
778      for (long i = minSeqId; i <= maxSeqId; i += 10) {
779        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
780        assertEquals(1, kvs.size());
781        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
782      }
783    } finally {
784      HBaseTestingUtility.closeRegionAndWAL(this.region);
785      this.region = null;
786      wals.close();
787    }
788  }
789
790  @Test
791  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
792    byte[] family = Bytes.toBytes("family");
793    this.region = initHRegion(tableName, method, CONF, family);
794    final WALFactory wals = new WALFactory(CONF, method);
795    try {
796      Path regiondir = region.getRegionFileSystem().getRegionDir();
797      FileSystem fs = region.getRegionFileSystem().getFileSystem();
798      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
799
800      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
801
802      long maxSeqId = 1050;
803      long minSeqId = 1000;
804
805      for (long i = minSeqId; i <= maxSeqId; i += 10) {
806        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
807        fs.create(recoveredEdits);
808        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
809
810        long time = System.nanoTime();
811        WALEdit edit = new WALEdit();
812        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
813            .toBytes(i)));
814        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
815            HConstants.DEFAULT_CLUSTER_ID), edit));
816
817        writer.close();
818      }
819      long recoverSeqId = 1030;
820      MonitoredTask status = TaskMonitor.get().createStatus(method);
821      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
822      for (HStore store : region.getStores()) {
823        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
824      }
825      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
826      assertEquals(maxSeqId, seqId);
827      region.getMVCC().advanceTo(seqId);
828      Get get = new Get(row);
829      Result result = region.get(get);
830      for (long i = minSeqId; i <= maxSeqId; i += 10) {
831        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
832        if (i < recoverSeqId) {
833          assertEquals(0, kvs.size());
834        } else {
835          assertEquals(1, kvs.size());
836          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
837        }
838      }
839    } finally {
840      HBaseTestingUtility.closeRegionAndWAL(this.region);
841      this.region = null;
842      wals.close();
843    }
844  }
845
846  @Test
847  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
848    byte[] family = Bytes.toBytes("family");
849    this.region = initHRegion(tableName, method, CONF, family);
850    Path regiondir = region.getRegionFileSystem().getRegionDir();
851    FileSystem fs = region.getRegionFileSystem().getFileSystem();
852
853    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
854    for (int i = 1000; i < 1050; i += 10) {
855      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
856      FSDataOutputStream dos = fs.create(recoveredEdits);
857      dos.writeInt(i);
858      dos.close();
859    }
860    long minSeqId = 2000;
861    Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
862    FSDataOutputStream dos = fs.create(recoveredEdits);
863    dos.close();
864
865    Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
866    for (HStore store : region.getStores()) {
867      maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
868    }
869    long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
870    assertEquals(minSeqId, seqId);
871  }
872
873  @Test
874  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
875    byte[] family = Bytes.toBytes("family");
876    this.region = initHRegion(tableName, method, CONF, family);
877    final WALFactory wals = new WALFactory(CONF, method);
878    try {
879      Path regiondir = region.getRegionFileSystem().getRegionDir();
880      FileSystem fs = region.getRegionFileSystem().getFileSystem();
881      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
882      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
883
884      assertEquals(0, region.getStoreFileList(columns).size());
885
886      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
887
888      long maxSeqId = 1050;
889      long minSeqId = 1000;
890
891      for (long i = minSeqId; i <= maxSeqId; i += 10) {
892        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
893        fs.create(recoveredEdits);
894        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
895
896        long time = System.nanoTime();
897        WALEdit edit = null;
898        if (i == maxSeqId) {
899          edit = WALEdit.createCompaction(region.getRegionInfo(),
900          CompactionDescriptor.newBuilder()
901          .setTableName(ByteString.copyFrom(tableName.getName()))
902          .setFamilyName(ByteString.copyFrom(regionName))
903          .setEncodedRegionName(ByteString.copyFrom(regionName))
904          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
905          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
906          .build());
907        } else {
908          edit = new WALEdit();
909          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
910            .toBytes(i)));
911        }
912        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
913            HConstants.DEFAULT_CLUSTER_ID), edit));
914        writer.close();
915      }
916
917      long recoverSeqId = 1030;
918      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
919      MonitoredTask status = TaskMonitor.get().createStatus(method);
920      for (HStore store : region.getStores()) {
921        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
922      }
923      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
924      assertEquals(maxSeqId, seqId);
925
926      // assert that the files are flushed
927      assertEquals(1, region.getStoreFileList(columns).size());
928
929    } finally {
930      HBaseTestingUtility.closeRegionAndWAL(this.region);
931      this.region = null;
932      wals.close();
933    }
934  }
935
936  @Test
937  public void testRecoveredEditsReplayCompaction() throws Exception {
938    testRecoveredEditsReplayCompaction(false);
939    testRecoveredEditsReplayCompaction(true);
940  }
941
942  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
943    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
944    byte[] family = Bytes.toBytes("family");
945    this.region = initHRegion(tableName, method, CONF, family);
946    final WALFactory wals = new WALFactory(CONF, method);
947    try {
948      Path regiondir = region.getRegionFileSystem().getRegionDir();
949      FileSystem fs = region.getRegionFileSystem().getFileSystem();
950      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
951
952      long maxSeqId = 3;
953      long minSeqId = 0;
954
955      for (long i = minSeqId; i < maxSeqId; i++) {
956        Put put = new Put(Bytes.toBytes(i));
957        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
958        region.put(put);
959        region.flush(true);
960      }
961
962      // this will create a region with 3 files
963      assertEquals(3, region.getStore(family).getStorefilesCount());
964      List<Path> storeFiles = new ArrayList<>(3);
965      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
966        storeFiles.add(sf.getPath());
967      }
968
969      // disable compaction completion
970      CONF.setBoolean("hbase.hstore.compaction.complete", false);
971      region.compactStores();
972
973      // ensure that nothing changed
974      assertEquals(3, region.getStore(family).getStorefilesCount());
975
976      // now find the compacted file, and manually add it to the recovered edits
977      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
978      FileStatus[] files = CommonFSUtils.listStatus(fs, tmpDir);
979      String errorMsg = "Expected to find 1 file in the region temp directory "
980          + "from the compaction, could not find any";
981      assertNotNull(errorMsg, files);
982      assertEquals(errorMsg, 1, files.length);
983      // move the file inside region dir
984      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
985          files[0].getPath());
986
987      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
988      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
989      for (int i=0; i < encodedNameAsBytes.length; i++) {
990        // Mix the byte array to have a new encodedName
991        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
992      }
993
994      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
995        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
996            storeFiles, Lists.newArrayList(newFile),
997            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
998
999      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
1000          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
1001
1002      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1003
1004      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1005      fs.create(recoveredEdits);
1006      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1007
1008      long time = System.nanoTime();
1009
1010      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
1011          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
1012          compactionDescriptor)));
1013      writer.close();
1014
1015      // close the region now, and reopen again
1016      region.getTableDescriptor();
1017      region.getRegionInfo();
1018      HBaseTestingUtility.closeRegionAndWAL(this.region);
1019      try {
1020        region = HRegion.openHRegion(region, null);
1021      } catch (WrongRegionException wre) {
1022        fail("Matching encoded region name should not have produced WrongRegionException");
1023      }
1024
1025      // now check whether we have only one store file, the compacted one
1026      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
1027      for (HStoreFile sf : sfs) {
1028        LOG.info(Objects.toString(sf.getPath()));
1029      }
1030      if (!mismatchedRegionName) {
1031        assertEquals(1, region.getStore(family).getStorefilesCount());
1032      }
1033      files = CommonFSUtils.listStatus(fs, tmpDir);
1034      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
1035
1036      for (long i = minSeqId; i < maxSeqId; i++) {
1037        Get get = new Get(Bytes.toBytes(i));
1038        Result result = region.get(get);
1039        byte[] value = result.getValue(family, Bytes.toBytes(i));
1040        assertArrayEquals(Bytes.toBytes(i), value);
1041      }
1042    } finally {
1043      HBaseTestingUtility.closeRegionAndWAL(this.region);
1044      this.region = null;
1045      wals.close();
1046      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
1047    }
1048  }
1049
1050  @Test
1051  public void testFlushMarkers() throws Exception {
1052    // tests that flush markers are written to WAL and handled at recovered edits
1053    byte[] family = Bytes.toBytes("family");
1054    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1055    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1056    CommonFSUtils.setRootDir(walConf, logDir);
1057    final WALFactory wals = new WALFactory(walConf, method);
1058    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1059
1060    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1061      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1062    try {
1063      Path regiondir = region.getRegionFileSystem().getRegionDir();
1064      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1065      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1066
1067      long maxSeqId = 3;
1068      long minSeqId = 0;
1069
1070      for (long i = minSeqId; i < maxSeqId; i++) {
1071        Put put = new Put(Bytes.toBytes(i));
1072        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1073        region.put(put);
1074        region.flush(true);
1075      }
1076
1077      // this will create a region with 3 files from flush
1078      assertEquals(3, region.getStore(family).getStorefilesCount());
1079      List<String> storeFiles = new ArrayList<>(3);
1080      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1081        storeFiles.add(sf.getPath().getName());
1082      }
1083
1084      // now verify that the flush markers are written
1085      wal.shutdown();
1086      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1087        TEST_UTIL.getConfiguration());
1088      try {
1089        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1090        long lastFlushSeqId = -1;
1091        while (true) {
1092          WAL.Entry entry = reader.next();
1093          if (entry == null) {
1094            break;
1095          }
1096          Cell cell = entry.getEdit().getCells().get(0);
1097          if (WALEdit.isMetaEditFamily(cell)) {
1098            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1099            assertNotNull(flushDesc);
1100            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1101            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1102              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1103            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1104              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1105            }
1106            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1107            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1108            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1109            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1110            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1111            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1112            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1113              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1114            } else {
1115              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1116              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1117            }
1118
1119            flushDescriptors.add(entry);
1120          }
1121        }
1122
1123        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1124
1125        // now write those markers to the recovered edits again.
1126
1127        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1128
1129        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1130        fs.create(recoveredEdits);
1131        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1132
1133        for (WAL.Entry entry : flushDescriptors) {
1134          writer.append(entry);
1135        }
1136        writer.close();
1137      } finally {
1138        if (null != reader) {
1139          try {
1140            reader.close();
1141          } catch (IOException exception) {
1142            LOG.warn("Problem closing wal: " + exception.getMessage());
1143            LOG.debug("exception details", exception);
1144          }
1145        }
1146      }
1147
1148      // close the region now, and reopen again
1149      HBaseTestingUtility.closeRegionAndWAL(this.region);
1150      region = HRegion.openHRegion(region, null);
1151
1152      // now check whether we have can read back the data from region
1153      for (long i = minSeqId; i < maxSeqId; i++) {
1154        Get get = new Get(Bytes.toBytes(i));
1155        Result result = region.get(get);
1156        byte[] value = result.getValue(family, Bytes.toBytes(i));
1157        assertArrayEquals(Bytes.toBytes(i), value);
1158      }
1159    } finally {
1160      HBaseTestingUtility.closeRegionAndWAL(this.region);
1161      this.region = null;
1162      wals.close();
1163    }
1164  }
1165
1166  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1167    volatile FlushAction[] actions;
1168    public IsFlushWALMarker(FlushAction... actions) {
1169      this.actions = actions;
1170    }
1171    @Override
1172    public boolean matches(WALEdit edit) {
1173      List<Cell> cells = edit.getCells();
1174      if (cells.isEmpty()) {
1175        return false;
1176      }
1177      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1178        FlushDescriptor desc;
1179        try {
1180          desc = WALEdit.getFlushDescriptor(cells.get(0));
1181        } catch (IOException e) {
1182          LOG.warn(e.toString(), e);
1183          return false;
1184        }
1185        if (desc != null) {
1186          for (FlushAction action : actions) {
1187            if (desc.getAction() == action) {
1188              return true;
1189            }
1190          }
1191        }
1192      }
1193      return false;
1194    }
1195    public IsFlushWALMarker set(FlushAction... actions) {
1196      this.actions = actions;
1197      return this;
1198    }
1199  }
1200
1201  @Test
1202  public void testFlushMarkersWALFail() throws Exception {
1203    // test the cases where the WAL append for flush markers fail.
1204    byte[] family = Bytes.toBytes("family");
1205
1206    // spy an actual WAL implementation to throw exception (was not able to mock)
1207    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1208
1209    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1210    CommonFSUtils.setRootDir(walConf, logDir);
1211    // Make up a WAL that we can manipulate at append time.
1212    class FailAppendFlushMarkerWAL extends FSHLog {
1213      volatile FlushAction [] flushActions = null;
1214
1215      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1216      throws IOException {
1217        super(fs, root, logDir, conf);
1218      }
1219
1220      @Override
1221      protected Writer createWriterInstance(Path path) throws IOException {
1222        final Writer w = super.createWriterInstance(path);
1223        return new Writer() {
1224          @Override
1225          public void close() throws IOException {
1226            w.close();
1227          }
1228
1229          @Override
1230          public void sync(boolean forceSync) throws IOException {
1231            w.sync(forceSync);
1232          }
1233
1234          @Override
1235          public void append(Entry entry) throws IOException {
1236            List<Cell> cells = entry.getEdit().getCells();
1237            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1238              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1239              if (desc != null) {
1240                for (FlushAction flushAction: flushActions) {
1241                  if (desc.getAction().equals(flushAction)) {
1242                    throw new IOException("Failed to append flush marker! " + flushAction);
1243                  }
1244                }
1245              }
1246            }
1247            w.append(entry);
1248          }
1249
1250          @Override
1251          public long getLength() {
1252            return w.getLength();
1253          }
1254        };
1255      }
1256    }
1257    FailAppendFlushMarkerWAL wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf),
1258      CommonFSUtils.getRootDir(walConf), method, walConf);
1259    wal.init();
1260    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1261      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1262    int i = 0;
1263    Put put = new Put(Bytes.toBytes(i));
1264    put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1265    put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1266    region.put(put);
1267
1268    // 1. Test case where START_FLUSH throws exception
1269    wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1270
1271    // start cache flush will throw exception
1272    try {
1273      region.flush(true);
1274      fail("This should have thrown exception");
1275    } catch (DroppedSnapshotException unexpected) {
1276      // this should not be a dropped snapshot exception. Meaning that RS will not abort
1277      throw unexpected;
1278    } catch (IOException expected) {
1279      // expected
1280    }
1281    // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1282    // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1283    region.close(true);
1284    wal.close();
1285
1286    // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1287    wal.flushActions = new FlushAction[] { FlushAction.COMMIT_FLUSH };
1288    wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), CommonFSUtils.getRootDir(walConf),
1289      method, walConf);
1290    wal.init();
1291    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1292      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1293    region.put(put);
1294    // 3. Test case where ABORT_FLUSH will throw exception.
1295    // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1296    // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
1297    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1298
1299    try {
1300      region.flush(true);
1301      fail("This should have thrown exception");
1302    } catch (DroppedSnapshotException expected) {
1303      // we expect this exception, since we were able to write the snapshot, but failed to
1304      // write the flush marker to WAL
1305    } catch (IOException unexpected) {
1306      throw unexpected;
1307    }
1308  }
1309
1310  @Test
1311  public void testGetWhileRegionClose() throws IOException {
1312    Configuration hc = initSplit();
1313    int numRows = 100;
1314    byte[][] families = { fam1, fam2, fam3 };
1315
1316    // Setting up region
1317    this.region = initHRegion(tableName, method, hc, families);
1318    // Put data in region
1319    final int startRow = 100;
1320    putData(startRow, numRows, qual1, families);
1321    putData(startRow, numRows, qual2, families);
1322    putData(startRow, numRows, qual3, families);
1323    final AtomicBoolean done = new AtomicBoolean(false);
1324    final AtomicInteger gets = new AtomicInteger(0);
1325    GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1326    try {
1327      // Set ten threads running concurrently getting from the region.
1328      for (int i = 0; i < threads.length / 2; i++) {
1329        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1330        threads[i].setDaemon(true);
1331        threads[i].start();
1332      }
1333      // Artificially make the condition by setting closing flag explicitly.
1334      // I can't make the issue happen with a call to region.close().
1335      this.region.closing.set(true);
1336      for (int i = threads.length / 2; i < threads.length; i++) {
1337        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1338        threads[i].setDaemon(true);
1339        threads[i].start();
1340      }
1341    } finally {
1342      if (this.region != null) {
1343        HBaseTestingUtility.closeRegionAndWAL(this.region);
1344        this.region = null;
1345      }
1346    }
1347    done.set(true);
1348    for (GetTillDoneOrException t : threads) {
1349      try {
1350        t.join();
1351      } catch (InterruptedException e) {
1352        e.printStackTrace();
1353      }
1354      if (t.e != null) {
1355        LOG.info("Exception=" + t.e);
1356        assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1357      }
1358    }
1359  }
1360
1361  /*
1362   * Thread that does get on single row until 'done' flag is flipped. If an
1363   * exception causes us to fail, it records it.
1364   */
1365  class GetTillDoneOrException extends Thread {
1366    private final Get g;
1367    private final AtomicBoolean done;
1368    private final AtomicInteger count;
1369    private Exception e;
1370
1371    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1372        final AtomicInteger c) {
1373      super("getter." + i);
1374      this.g = new Get(r);
1375      this.done = d;
1376      this.count = c;
1377    }
1378
1379    @Override
1380    public void run() {
1381      while (!this.done.get()) {
1382        try {
1383          assertTrue(region.get(g).size() > 0);
1384          this.count.incrementAndGet();
1385        } catch (Exception e) {
1386          this.e = e;
1387          break;
1388        }
1389      }
1390    }
1391  }
1392
1393  /*
1394   * An involved filter test. Has multiple column families and deletes in mix.
1395   */
1396  @Test
1397  public void testWeirdCacheBehaviour() throws Exception {
1398    final TableName tableName = TableName.valueOf(name.getMethodName());
1399    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1400        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1401    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1402    String value = "this is the value";
1403    String value2 = "this is some other value";
1404    String keyPrefix1 = "prefix1";
1405    String keyPrefix2 = "prefix2";
1406    String keyPrefix3 = "prefix3";
1407    putRows(this.region, 3, value, keyPrefix1);
1408    putRows(this.region, 3, value, keyPrefix2);
1409    putRows(this.region, 3, value, keyPrefix3);
1410    putRows(this.region, 3, value2, keyPrefix1);
1411    putRows(this.region, 3, value2, keyPrefix2);
1412    putRows(this.region, 3, value2, keyPrefix3);
1413    System.out.println("Checking values for key: " + keyPrefix1);
1414    assertEquals("Got back incorrect number of rows from scan", 3,
1415        getNumberOfRows(keyPrefix1, value2, this.region));
1416    System.out.println("Checking values for key: " + keyPrefix2);
1417    assertEquals("Got back incorrect number of rows from scan", 3,
1418        getNumberOfRows(keyPrefix2, value2, this.region));
1419    System.out.println("Checking values for key: " + keyPrefix3);
1420    assertEquals("Got back incorrect number of rows from scan", 3,
1421        getNumberOfRows(keyPrefix3, value2, this.region));
1422    deleteColumns(this.region, value2, keyPrefix1);
1423    deleteColumns(this.region, value2, keyPrefix2);
1424    deleteColumns(this.region, value2, keyPrefix3);
1425    System.out.println("Starting important checks.....");
1426    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1427        getNumberOfRows(keyPrefix1, value2, this.region));
1428    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1429        getNumberOfRows(keyPrefix2, value2, this.region));
1430    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1431        getNumberOfRows(keyPrefix3, value2, this.region));
1432  }
1433
1434  @Test
1435  public void testAppendWithReadOnlyTable() throws Exception {
1436    final TableName tableName = TableName.valueOf(name.getMethodName());
1437    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1438    boolean exceptionCaught = false;
1439    Append append = new Append(Bytes.toBytes("somerow"));
1440    append.setDurability(Durability.SKIP_WAL);
1441    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1442        Bytes.toBytes("somevalue"));
1443    try {
1444      region.append(append);
1445    } catch (IOException e) {
1446      exceptionCaught = true;
1447    }
1448    assertTrue(exceptionCaught == true);
1449  }
1450
1451  @Test
1452  public void testIncrWithReadOnlyTable() throws Exception {
1453    final TableName tableName = TableName.valueOf(name.getMethodName());
1454    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1455    boolean exceptionCaught = false;
1456    Increment inc = new Increment(Bytes.toBytes("somerow"));
1457    inc.setDurability(Durability.SKIP_WAL);
1458    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1459    try {
1460      region.increment(inc);
1461    } catch (IOException e) {
1462      exceptionCaught = true;
1463    }
1464    assertTrue(exceptionCaught == true);
1465  }
1466
1467  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1468    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1469    int count = 0;
1470    boolean more = false;
1471    List<Cell> results = new ArrayList<>();
1472    do {
1473      more = scanner.next(results);
1474      if (results != null && !results.isEmpty())
1475        count++;
1476      else
1477        break;
1478      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1479      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1480      r.delete(delete);
1481      results.clear();
1482    } while (more);
1483    assertEquals("Did not perform correct number of deletes", 3, count);
1484  }
1485
1486  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1487    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1488    int numberOfResults = 0;
1489    List<Cell> results = new ArrayList<>();
1490    boolean more = false;
1491    do {
1492      more = resultScanner.next(results);
1493      if (results != null && !results.isEmpty())
1494        numberOfResults++;
1495      else
1496        break;
1497      for (Cell kv : results) {
1498        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1499      }
1500      results.clear();
1501    } while (more);
1502    return numberOfResults;
1503  }
1504
1505  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1506      throws IOException {
1507    // Defaults FilterList.Operator.MUST_PASS_ALL.
1508    FilterList allFilters = new FilterList();
1509    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1510    // Only return rows where this column value exists in the row.
1511    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1512        Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1513    filter.setFilterIfMissing(true);
1514    allFilters.addFilter(filter);
1515    Scan scan = new Scan();
1516    scan.addFamily(Bytes.toBytes("trans-blob"));
1517    scan.addFamily(Bytes.toBytes("trans-type"));
1518    scan.addFamily(Bytes.toBytes("trans-date"));
1519    scan.addFamily(Bytes.toBytes("trans-tags"));
1520    scan.addFamily(Bytes.toBytes("trans-group"));
1521    scan.setFilter(allFilters);
1522    return r.getScanner(scan);
1523  }
1524
1525  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1526    for (int i = 0; i < numRows; i++) {
1527      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1528      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1529      Put put = new Put(Bytes.toBytes(row));
1530      put.setDurability(Durability.SKIP_WAL);
1531      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1532      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1533      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1534      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1535      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1536      r.put(put);
1537    }
1538  }
1539
1540  @Test
1541  public void testFamilyWithAndWithoutColon() throws Exception {
1542    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1543    this.region = initHRegion(tableName, method, CONF, cf);
1544    Put p = new Put(tableName.toBytes());
1545    byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1546    p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1547    boolean exception = false;
1548    try {
1549      this.region.put(p);
1550    } catch (NoSuchColumnFamilyException e) {
1551      exception = true;
1552    }
1553    assertTrue(exception);
1554  }
1555
1556  @Test
1557  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1558    final Put[] puts = new Put[10];
1559    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1560    long syncs = prepareRegionForBachPut(puts, source, false);
1561
1562    OperationStatus[] codes = this.region.batchMutate(puts);
1563    assertEquals(10, codes.length);
1564    for (int i = 0; i < 10; i++) {
1565      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1566    }
1567    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1568
1569    LOG.info("Next a batch put with one invalid family");
1570    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1571    codes = this.region.batchMutate(puts);
1572    assertEquals(10, codes.length);
1573    for (int i = 0; i < 10; i++) {
1574      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1575          codes[i].getOperationStatusCode());
1576    }
1577
1578    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1579  }
1580
1581  @Test
1582  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1583    final Put[] puts = new Put[10];
1584    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1585    long syncs = prepareRegionForBachPut(puts, source, false);
1586
1587    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1588
1589    LOG.info("batchPut will have to break into four batches to avoid row locks");
1590    RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1591    RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1592    RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1593    RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1594
1595    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1596    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1597    final CountDownLatch startingPuts = new CountDownLatch(1);
1598    final CountDownLatch startingClose = new CountDownLatch(1);
1599    TestThread putter = new TestThread(ctx) {
1600      @Override
1601      public void doWork() throws IOException {
1602        startingPuts.countDown();
1603        retFromThread.set(region.batchMutate(puts));
1604      }
1605    };
1606    LOG.info("...starting put thread while holding locks");
1607    ctx.addThread(putter);
1608    ctx.startThreads();
1609
1610    // Now attempt to close the region from another thread.  Prior to HBASE-12565
1611    // this would cause the in-progress batchMutate operation to to fail with
1612    // exception because it use to release and re-acquire the close-guard lock
1613    // between batches.  Caller then didn't get status indicating which writes succeeded.
1614    // We now expect this thread to block until the batchMutate call finishes.
1615    Thread regionCloseThread = new TestThread(ctx) {
1616      @Override
1617      public void doWork() {
1618        try {
1619          startingPuts.await();
1620          // Give some time for the batch mutate to get in.
1621          // We don't want to race with the mutate
1622          Thread.sleep(10);
1623          startingClose.countDown();
1624          HBaseTestingUtility.closeRegionAndWAL(region);
1625          region = null;
1626        } catch (IOException e) {
1627          throw new RuntimeException(e);
1628        } catch (InterruptedException e) {
1629          throw new RuntimeException(e);
1630        }
1631      }
1632    };
1633    regionCloseThread.start();
1634
1635    startingClose.await();
1636    startingPuts.await();
1637    Thread.sleep(100);
1638    LOG.info("...releasing row lock 1, which should let put thread continue");
1639    rowLock1.release();
1640    rowLock2.release();
1641    rowLock3.release();
1642    waitForCounter(source, "syncTimeNumOps", syncs + 1);
1643
1644    LOG.info("...joining on put thread");
1645    ctx.stop();
1646    regionCloseThread.join();
1647
1648    OperationStatus[] codes = retFromThread.get();
1649    for (int i = 0; i < codes.length; i++) {
1650      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1651          codes[i].getOperationStatusCode());
1652    }
1653    rowLock4.release();
1654  }
1655
1656  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1657      throws InterruptedException {
1658    long startWait = System.currentTimeMillis();
1659    long currentCount;
1660    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1661      Thread.sleep(100);
1662      if (System.currentTimeMillis() - startWait > 10000) {
1663        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1664            expectedCount, currentCount));
1665      }
1666    }
1667  }
1668
1669  @Test
1670  public void testAtomicBatchPut() throws IOException {
1671    final Put[] puts = new Put[10];
1672    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1673    long syncs = prepareRegionForBachPut(puts, source, false);
1674
1675    // 1. Straight forward case, should succeed
1676    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1677        HConstants.NO_NONCE, HConstants.NO_NONCE);
1678    OperationStatus[] codes = this.region.batchMutate(batchOp);
1679    assertEquals(10, codes.length);
1680    for (int i = 0; i < 10; i++) {
1681      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1682    }
1683    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1684
1685    // 2. Failed to get lock
1686    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1687    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1688    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1689    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1690    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1691    final CountDownLatch finishedPuts = new CountDownLatch(1);
1692    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1693        HConstants
1694        .NO_NONCE,
1695        HConstants.NO_NONCE);
1696    TestThread putter = new TestThread(ctx) {
1697      @Override
1698      public void doWork() throws IOException {
1699        try {
1700          region.batchMutate(finalBatchOp);
1701        } catch (IOException ioe) {
1702          LOG.error("test failed!", ioe);
1703          retFromThread.set(ioe);
1704        }
1705        finishedPuts.countDown();
1706      }
1707    };
1708    LOG.info("...starting put thread while holding locks");
1709    ctx.addThread(putter);
1710    ctx.startThreads();
1711    LOG.info("...waiting for batch puts while holding locks");
1712    try {
1713      finishedPuts.await();
1714    } catch (InterruptedException e) {
1715      LOG.error("Interrupted!", e);
1716    } finally {
1717      if (lock != null) {
1718        lock.release();
1719      }
1720    }
1721    assertNotNull(retFromThread.get());
1722    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1723
1724    // 3. Exception thrown in validation
1725    LOG.info("Next a batch put with one invalid family");
1726    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1727    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1728        HConstants.NO_NONCE);
1729    thrown.expect(NoSuchColumnFamilyException.class);
1730    this.region.batchMutate(batchOp);
1731  }
1732
1733  @Test
1734  public void testBatchPutWithTsSlop() throws Exception {
1735    // add data with a timestamp that is too recent for range. Ensure assert
1736    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1737    final Put[] puts = new Put[10];
1738    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1739
1740    long syncs = prepareRegionForBachPut(puts, source, true);
1741
1742    OperationStatus[] codes = this.region.batchMutate(puts);
1743    assertEquals(10, codes.length);
1744    for (int i = 0; i < 10; i++) {
1745      assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1746    }
1747    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1748  }
1749
1750  /**
1751   * @return syncs initial syncTimeNumOps
1752   */
1753  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1754      boolean slop) throws IOException {
1755    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1756
1757    LOG.info("First a batch put with all valid puts");
1758    for (int i = 0; i < puts.length; i++) {
1759      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1760          new Put(Bytes.toBytes("row_" + i));
1761      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1762    }
1763
1764    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1765    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1766    return syncs;
1767  }
1768
1769  // ////////////////////////////////////////////////////////////////////////////
1770  // checkAndMutate tests
1771  // ////////////////////////////////////////////////////////////////////////////
1772  @Test
1773  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1774    byte[] row1 = Bytes.toBytes("row1");
1775    byte[] fam1 = Bytes.toBytes("fam1");
1776    byte[] qf1 = Bytes.toBytes("qualifier");
1777    byte[] emptyVal = new byte[] {};
1778    byte[] val1 = Bytes.toBytes("value1");
1779    byte[] val2 = Bytes.toBytes("value2");
1780
1781    // Setting up region
1782    this.region = initHRegion(tableName, method, CONF, fam1);
1783    // Putting empty data in key
1784    Put put = new Put(row1);
1785    put.addColumn(fam1, qf1, emptyVal);
1786
1787    // checkAndPut with empty value
1788    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1789        new BinaryComparator(emptyVal), put);
1790    assertTrue(res);
1791
1792    // Putting data in key
1793    put = new Put(row1);
1794    put.addColumn(fam1, qf1, val1);
1795
1796    // checkAndPut with correct value
1797    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1798        new BinaryComparator(emptyVal), put);
1799    assertTrue(res);
1800
1801    // not empty anymore
1802    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1803        new BinaryComparator(emptyVal), put);
1804    assertFalse(res);
1805
1806    Delete delete = new Delete(row1);
1807    delete.addColumn(fam1, qf1);
1808    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1809        new BinaryComparator(emptyVal), delete);
1810    assertFalse(res);
1811
1812    put = new Put(row1);
1813    put.addColumn(fam1, qf1, val2);
1814    // checkAndPut with correct value
1815    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1816        new BinaryComparator(val1), put);
1817    assertTrue(res);
1818
1819    // checkAndDelete with correct value
1820    delete = new Delete(row1);
1821    delete.addColumn(fam1, qf1);
1822    delete.addColumn(fam1, qf1);
1823    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1824        new BinaryComparator(val2), delete);
1825    assertTrue(res);
1826
1827    delete = new Delete(row1);
1828    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1829        new BinaryComparator(emptyVal), delete);
1830    assertTrue(res);
1831
1832    // checkAndPut looking for a null value
1833    put = new Put(row1);
1834    put.addColumn(fam1, qf1, val1);
1835
1836    res = region
1837        .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1838    assertTrue(res);
1839  }
1840
1841  @Test
1842  public void testCheckAndMutate_WithWrongValue() throws IOException {
1843    byte[] row1 = Bytes.toBytes("row1");
1844    byte[] fam1 = Bytes.toBytes("fam1");
1845    byte[] qf1 = Bytes.toBytes("qualifier");
1846    byte[] val1 = Bytes.toBytes("value1");
1847    byte[] val2 = Bytes.toBytes("value2");
1848    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1849    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1850
1851    // Setting up region
1852    this.region = initHRegion(tableName, method, CONF, fam1);
1853    // Putting data in key
1854    Put put = new Put(row1);
1855    put.addColumn(fam1, qf1, val1);
1856    region.put(put);
1857
1858    // checkAndPut with wrong value
1859    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1860        new BinaryComparator(val2), put);
1861    assertEquals(false, res);
1862
1863    // checkAndDelete with wrong value
1864    Delete delete = new Delete(row1);
1865    delete.addFamily(fam1);
1866    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1867        new BinaryComparator(val2), put);
1868    assertEquals(false, res);
1869
1870    // Putting data in key
1871    put = new Put(row1);
1872    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1873    region.put(put);
1874
1875    // checkAndPut with wrong value
1876    res =
1877        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1878            new BigDecimalComparator(bd2), put);
1879    assertEquals(false, res);
1880
1881    // checkAndDelete with wrong value
1882    delete = new Delete(row1);
1883    delete.addFamily(fam1);
1884    res =
1885        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1886            new BigDecimalComparator(bd2), put);
1887    assertEquals(false, res);
1888  }
1889
1890  @Test
1891  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1892    byte[] row1 = Bytes.toBytes("row1");
1893    byte[] fam1 = Bytes.toBytes("fam1");
1894    byte[] qf1 = Bytes.toBytes("qualifier");
1895    byte[] val1 = Bytes.toBytes("value1");
1896    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1897
1898    // Setting up region
1899    this.region = initHRegion(tableName, method, CONF, fam1);
1900    // Putting data in key
1901    long now = System.currentTimeMillis();
1902    Put put = new Put(row1);
1903    put.addColumn(fam1, qf1, now, val1);
1904    region.put(put);
1905
1906    // checkAndPut with correct value
1907    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1908        new BinaryComparator(val1), put);
1909    assertEquals("First", true, res);
1910
1911    // checkAndDelete with correct value
1912    Delete delete = new Delete(row1, now + 1);
1913    delete.addColumn(fam1, qf1);
1914    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1915        delete);
1916    assertEquals("Delete", true, res);
1917
1918    // Putting data in key
1919    put = new Put(row1);
1920    put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1));
1921    region.put(put);
1922
1923    // checkAndPut with correct value
1924    res =
1925        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1926            bd1), put);
1927    assertEquals("Second put", true, res);
1928
1929    // checkAndDelete with correct value
1930    delete = new Delete(row1, now + 3);
1931    delete.addColumn(fam1, qf1);
1932    res =
1933        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1934            bd1), delete);
1935    assertEquals("Second delete", true, res);
1936  }
1937
1938  @Test
1939  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1940    byte[] row1 = Bytes.toBytes("row1");
1941    byte[] fam1 = Bytes.toBytes("fam1");
1942    byte[] qf1 = Bytes.toBytes("qualifier");
1943    byte[] val1 = Bytes.toBytes("value1");
1944    byte[] val2 = Bytes.toBytes("value2");
1945    byte[] val3 = Bytes.toBytes("value3");
1946    byte[] val4 = Bytes.toBytes("value4");
1947
1948    // Setting up region
1949    this.region = initHRegion(tableName, method, CONF, fam1);
1950    // Putting val3 in key
1951    Put put = new Put(row1);
1952    put.addColumn(fam1, qf1, val3);
1953    region.put(put);
1954
1955    // Test CompareOp.LESS: original = val3, compare with val3, fail
1956    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1957        new BinaryComparator(val3), put);
1958    assertEquals(false, res);
1959
1960    // Test CompareOp.LESS: original = val3, compare with val4, fail
1961    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1962        new BinaryComparator(val4), put);
1963    assertEquals(false, res);
1964
1965    // Test CompareOp.LESS: original = val3, compare with val2,
1966    // succeed (now value = val2)
1967    put = new Put(row1);
1968    put.addColumn(fam1, qf1, val2);
1969    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1970        new BinaryComparator(val2), put);
1971    assertEquals(true, res);
1972
1973    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1974    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1975        new BinaryComparator(val3), put);
1976    assertEquals(false, res);
1977
1978    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1979    // succeed (value still = val2)
1980    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1981        new BinaryComparator(val2), put);
1982    assertEquals(true, res);
1983
1984    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1985    // succeed (now value = val3)
1986    put = new Put(row1);
1987    put.addColumn(fam1, qf1, val3);
1988    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1989        new BinaryComparator(val1), put);
1990    assertEquals(true, res);
1991
1992    // Test CompareOp.GREATER: original = val3, compare with val3, fail
1993    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1994        new BinaryComparator(val3), put);
1995    assertEquals(false, res);
1996
1997    // Test CompareOp.GREATER: original = val3, compare with val2, fail
1998    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1999        new BinaryComparator(val2), put);
2000    assertEquals(false, res);
2001
2002    // Test CompareOp.GREATER: original = val3, compare with val4,
2003    // succeed (now value = val2)
2004    put = new Put(row1);
2005    put.addColumn(fam1, qf1, val2);
2006    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2007        new BinaryComparator(val4), put);
2008    assertEquals(true, res);
2009
2010    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2011    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2012        new BinaryComparator(val1), put);
2013    assertEquals(false, res);
2014
2015    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2016    // succeed (value still = val2)
2017    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2018        new BinaryComparator(val2), put);
2019    assertEquals(true, res);
2020
2021    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2022    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2023        new BinaryComparator(val3), put);
2024    assertEquals(true, res);
2025  }
2026
2027  @Test
2028  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2029    byte[] row1 = Bytes.toBytes("row1");
2030    byte[] fam1 = Bytes.toBytes("fam1");
2031    byte[] fam2 = Bytes.toBytes("fam2");
2032    byte[] qf1 = Bytes.toBytes("qualifier");
2033    byte[] val1 = Bytes.toBytes("value1");
2034    byte[] val2 = Bytes.toBytes("value2");
2035
2036    byte[][] families = { fam1, fam2 };
2037
2038    // Setting up region
2039    this.region = initHRegion(tableName, method, CONF, families);
2040    // Putting data in the key to check
2041    Put put = new Put(row1);
2042    put.addColumn(fam1, qf1, val1);
2043    region.put(put);
2044
2045    // Creating put to add
2046    long ts = System.currentTimeMillis();
2047    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2048    put = new Put(row1);
2049    put.add(kv);
2050
2051    // checkAndPut with wrong value
2052    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2053        new BinaryComparator(val1), put);
2054    assertEquals(true, res);
2055
2056    Get get = new Get(row1);
2057    get.addColumn(fam2, qf1);
2058    Cell[] actual = region.get(get).rawCells();
2059
2060    Cell[] expected = { kv };
2061
2062    assertEquals(expected.length, actual.length);
2063    for (int i = 0; i < actual.length; i++) {
2064      assertEquals(expected[i], actual[i]);
2065    }
2066  }
2067
2068  @Test
2069  public void testCheckAndPut_wrongRowInPut() throws IOException {
2070    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2071    Put put = new Put(row2);
2072    put.addColumn(fam1, qual1, value1);
2073    try {
2074      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2075          new BinaryComparator(value2), put);
2076      fail();
2077    } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2078      // expected exception.
2079    }
2080  }
2081
2082  @Test
2083  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2084    byte[] row1 = Bytes.toBytes("row1");
2085    byte[] fam1 = Bytes.toBytes("fam1");
2086    byte[] fam2 = Bytes.toBytes("fam2");
2087    byte[] qf1 = Bytes.toBytes("qualifier1");
2088    byte[] qf2 = Bytes.toBytes("qualifier2");
2089    byte[] qf3 = Bytes.toBytes("qualifier3");
2090    byte[] val1 = Bytes.toBytes("value1");
2091    byte[] val2 = Bytes.toBytes("value2");
2092    byte[] val3 = Bytes.toBytes("value3");
2093    byte[] emptyVal = new byte[] {};
2094
2095    byte[][] families = { fam1, fam2 };
2096
2097    // Setting up region
2098    this.region = initHRegion(tableName, method, CONF, families);
2099    // Put content
2100    Put put = new Put(row1);
2101    put.addColumn(fam1, qf1, val1);
2102    region.put(put);
2103    Threads.sleep(2);
2104
2105    put = new Put(row1);
2106    put.addColumn(fam1, qf1, val2);
2107    put.addColumn(fam2, qf1, val3);
2108    put.addColumn(fam2, qf2, val2);
2109    put.addColumn(fam2, qf3, val1);
2110    put.addColumn(fam1, qf3, val1);
2111    region.put(put);
2112
2113    LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString());
2114
2115    // Multi-column delete
2116    Delete delete = new Delete(row1);
2117    delete.addColumn(fam1, qf1);
2118    delete.addColumn(fam2, qf1);
2119    delete.addColumn(fam1, qf3);
2120    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2121        new BinaryComparator(val2), delete);
2122    assertEquals(true, res);
2123
2124    Get get = new Get(row1);
2125    get.addColumn(fam1, qf1);
2126    get.addColumn(fam1, qf3);
2127    get.addColumn(fam2, qf2);
2128    Result r = region.get(get);
2129    assertEquals(2, r.size());
2130    assertArrayEquals(val1, r.getValue(fam1, qf1));
2131    assertArrayEquals(val2, r.getValue(fam2, qf2));
2132
2133    // Family delete
2134    delete = new Delete(row1);
2135    delete.addFamily(fam2);
2136    res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL,
2137        new BinaryComparator(emptyVal), delete);
2138    assertEquals(true, res);
2139
2140    get = new Get(row1);
2141    r = region.get(get);
2142    assertEquals(1, r.size());
2143    assertArrayEquals(val1, r.getValue(fam1, qf1));
2144
2145    // Row delete
2146    delete = new Delete(row1);
2147    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2148        delete);
2149    assertEquals(true, res);
2150    get = new Get(row1);
2151    r = region.get(get);
2152    assertEquals(0, r.size());
2153  }
2154
2155  @Test
2156  public void testCheckAndMutate_WithFilters() throws Throwable {
2157    final byte[] FAMILY = Bytes.toBytes("fam");
2158
2159    // Setting up region
2160    this.region = initHRegion(tableName, method, CONF, FAMILY);
2161
2162    // Put one row
2163    Put put = new Put(row);
2164    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
2165    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
2166    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
2167    region.put(put);
2168
2169    // Put with success
2170    boolean ok = region.checkAndMutate(row,
2171      new FilterList(
2172        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2173          Bytes.toBytes("a")),
2174        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2175          Bytes.toBytes("b"))
2176      ),
2177      new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
2178    assertTrue(ok);
2179
2180    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
2181    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
2182
2183    // Put with failure
2184    ok = region.checkAndMutate(row,
2185      new FilterList(
2186        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2187          Bytes.toBytes("a")),
2188        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2189          Bytes.toBytes("c"))
2190      ),
2191      new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
2192    assertFalse(ok);
2193
2194    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
2195
2196    // Delete with success
2197    ok = region.checkAndMutate(row,
2198      new FilterList(
2199        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2200          Bytes.toBytes("a")),
2201        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2202          Bytes.toBytes("b"))
2203      ),
2204      new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")));
2205    assertTrue(ok);
2206
2207    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
2208
2209    // Mutate with success
2210    ok = region.checkAndRowMutate(row,
2211      new FilterList(
2212        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2213          Bytes.toBytes("a")),
2214        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2215          Bytes.toBytes("b"))
2216      ),
2217      new RowMutations(row)
2218        .add((Mutation) new Put(row)
2219          .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
2220        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
2221    assertTrue(ok);
2222
2223    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
2224    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
2225
2226    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
2227  }
2228
2229  @Test
2230  public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
2231    final byte[] FAMILY = Bytes.toBytes("fam");
2232
2233    // Setting up region
2234    this.region = initHRegion(tableName, method, CONF, FAMILY);
2235
2236    // Put with specifying the timestamp
2237    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
2238
2239    // Put with success
2240    boolean ok = region.checkAndMutate(row,
2241      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2242        Bytes.toBytes("a")),
2243      TimeRange.between(0, 101),
2244      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
2245    assertTrue(ok);
2246
2247    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
2248    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
2249
2250    // Put with failure
2251    ok = region.checkAndMutate(row,
2252      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2253        Bytes.toBytes("a")),
2254      TimeRange.between(0, 100),
2255      new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
2256    assertFalse(ok);
2257
2258    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
2259
2260    // Mutate with success
2261    ok = region.checkAndRowMutate(row,
2262      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2263        Bytes.toBytes("a")),
2264      TimeRange.between(0, 101),
2265      new RowMutations(row)
2266        .add((Mutation) new Put(row)
2267          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
2268        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
2269    assertTrue(ok);
2270
2271    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
2272    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
2273
2274    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
2275  }
2276
2277  @Test
2278  public void testCheckAndMutate_wrongMutationType() throws Throwable {
2279    // Setting up region
2280    this.region = initHRegion(tableName, method, CONF, fam1);
2281
2282    try {
2283      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(value1),
2284        new Increment(row).addColumn(fam1, qual1, 1));
2285      fail("should throw DoNotRetryIOException");
2286    } catch (DoNotRetryIOException e) {
2287      assertEquals("Action must be Put or Delete", e.getMessage());
2288    }
2289
2290    try {
2291      region.checkAndMutate(row,
2292        new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1),
2293        new Increment(row).addColumn(fam1, qual1, 1));
2294      fail("should throw DoNotRetryIOException");
2295    } catch (DoNotRetryIOException e) {
2296      assertEquals("Action must be Put or Delete", e.getMessage());
2297    }
2298  }
2299
2300  @Test
2301  public void testCheckAndMutate_wrongRow() throws Throwable {
2302    final byte[] wrongRow = Bytes.toBytes("wrongRow");
2303
2304    // Setting up region
2305    this.region = initHRegion(tableName, method, CONF, fam1);
2306
2307    try {
2308      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(value1),
2309        new Put(wrongRow).addColumn(fam1, qual1, value1));
2310      fail("should throw DoNotRetryIOException");
2311    } catch (DoNotRetryIOException e) {
2312      assertEquals("Action's getRow must match", e.getMessage());
2313    }
2314
2315    try {
2316      region.checkAndMutate(row,
2317        new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1),
2318        new Put(wrongRow).addColumn(fam1, qual1, value1));
2319      fail("should throw DoNotRetryIOException");
2320    } catch (DoNotRetryIOException e) {
2321      assertEquals("Action's getRow must match", e.getMessage());
2322    }
2323
2324    try {
2325      region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
2326        new BinaryComparator(value1),
2327        new RowMutations(wrongRow)
2328          .add((Mutation) new Put(wrongRow)
2329            .addColumn(fam1, qual1, value1))
2330          .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
2331      fail("should throw DoNotRetryIOException");
2332    } catch (DoNotRetryIOException e) {
2333      assertEquals("Action's getRow must match", e.getMessage());
2334    }
2335
2336    try {
2337      region.checkAndRowMutate(row,
2338        new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1),
2339        new RowMutations(wrongRow)
2340          .add((Mutation) new Put(wrongRow)
2341            .addColumn(fam1, qual1, value1))
2342          .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
2343      fail("should throw DoNotRetryIOException");
2344    } catch (DoNotRetryIOException e) {
2345      assertEquals("Action's getRow must match", e.getMessage());
2346    }
2347  }
2348
2349  // ////////////////////////////////////////////////////////////////////////////
2350  // Delete tests
2351  // ////////////////////////////////////////////////////////////////////////////
2352  @Test
2353  public void testDelete_multiDeleteColumn() throws IOException {
2354    byte[] row1 = Bytes.toBytes("row1");
2355    byte[] fam1 = Bytes.toBytes("fam1");
2356    byte[] qual = Bytes.toBytes("qualifier");
2357    byte[] value = Bytes.toBytes("value");
2358
2359    Put put = new Put(row1);
2360    put.addColumn(fam1, qual, 1, value);
2361    put.addColumn(fam1, qual, 2, value);
2362
2363    this.region = initHRegion(tableName, method, CONF, fam1);
2364    region.put(put);
2365
2366    // We do support deleting more than 1 'latest' version
2367    Delete delete = new Delete(row1);
2368    delete.addColumn(fam1, qual);
2369    delete.addColumn(fam1, qual);
2370    region.delete(delete);
2371
2372    Get get = new Get(row1);
2373    get.addFamily(fam1);
2374    Result r = region.get(get);
2375    assertEquals(0, r.size());
2376  }
2377
2378  @Test
2379  public void testDelete_CheckFamily() throws IOException {
2380    byte[] row1 = Bytes.toBytes("row1");
2381    byte[] fam1 = Bytes.toBytes("fam1");
2382    byte[] fam2 = Bytes.toBytes("fam2");
2383    byte[] fam3 = Bytes.toBytes("fam3");
2384    byte[] fam4 = Bytes.toBytes("fam4");
2385
2386    // Setting up region
2387    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2388    List<Cell> kvs = new ArrayList<>();
2389    kvs.add(new KeyValue(row1, fam4, null, null));
2390
2391    // testing existing family
2392    byte[] family = fam2;
2393    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2394    deleteMap.put(family, kvs);
2395    region.delete(deleteMap, Durability.SYNC_WAL);
2396
2397    // testing non existing family
2398    boolean ok = false;
2399    family = fam4;
2400    try {
2401      deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2402      deleteMap.put(family, kvs);
2403      region.delete(deleteMap, Durability.SYNC_WAL);
2404    } catch (Exception e) {
2405      ok = true;
2406    }
2407    assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2408  }
2409
2410  @Test
2411  public void testDelete_mixed() throws IOException, InterruptedException {
2412    byte[] fam = Bytes.toBytes("info");
2413    byte[][] families = { fam };
2414    this.region = initHRegion(tableName, method, CONF, families);
2415    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2416
2417    byte[] row = Bytes.toBytes("table_name");
2418    // column names
2419    byte[] serverinfo = Bytes.toBytes("serverinfo");
2420    byte[] splitA = Bytes.toBytes("splitA");
2421    byte[] splitB = Bytes.toBytes("splitB");
2422
2423    // add some data:
2424    Put put = new Put(row);
2425    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2426    region.put(put);
2427
2428    put = new Put(row);
2429    put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2430    region.put(put);
2431
2432    put = new Put(row);
2433    put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2434    region.put(put);
2435
2436    // ok now delete a split:
2437    Delete delete = new Delete(row);
2438    delete.addColumns(fam, splitA);
2439    region.delete(delete);
2440
2441    // assert some things:
2442    Get get = new Get(row).addColumn(fam, serverinfo);
2443    Result result = region.get(get);
2444    assertEquals(1, result.size());
2445
2446    get = new Get(row).addColumn(fam, splitA);
2447    result = region.get(get);
2448    assertEquals(0, result.size());
2449
2450    get = new Get(row).addColumn(fam, splitB);
2451    result = region.get(get);
2452    assertEquals(1, result.size());
2453
2454    // Assert that after a delete, I can put.
2455    put = new Put(row);
2456    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2457    region.put(put);
2458    get = new Get(row);
2459    result = region.get(get);
2460    assertEquals(3, result.size());
2461
2462    // Now delete all... then test I can add stuff back
2463    delete = new Delete(row);
2464    region.delete(delete);
2465    assertEquals(0, region.get(get).size());
2466
2467    region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2468    result = region.get(get);
2469    assertEquals(1, result.size());
2470  }
2471
2472  @Test
2473  public void testDeleteRowWithFutureTs() throws IOException {
2474    byte[] fam = Bytes.toBytes("info");
2475    byte[][] families = { fam };
2476    this.region = initHRegion(tableName, method, CONF, families);
2477    byte[] row = Bytes.toBytes("table_name");
2478    // column names
2479    byte[] serverinfo = Bytes.toBytes("serverinfo");
2480
2481    // add data in the far future
2482    Put put = new Put(row);
2483    put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2484    region.put(put);
2485
2486    // now delete something in the present
2487    Delete delete = new Delete(row);
2488    region.delete(delete);
2489
2490    // make sure we still see our data
2491    Get get = new Get(row).addColumn(fam, serverinfo);
2492    Result result = region.get(get);
2493    assertEquals(1, result.size());
2494
2495    // delete the future row
2496    delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2497    region.delete(delete);
2498
2499    // make sure it is gone
2500    get = new Get(row).addColumn(fam, serverinfo);
2501    result = region.get(get);
2502    assertEquals(0, result.size());
2503  }
2504
2505  /**
2506   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2507   * the actual timestamp
2508   */
2509  @Test
2510  public void testPutWithLatestTS() throws IOException {
2511    byte[] fam = Bytes.toBytes("info");
2512    byte[][] families = { fam };
2513    this.region = initHRegion(tableName, method, CONF, families);
2514    byte[] row = Bytes.toBytes("row1");
2515    // column names
2516    byte[] qual = Bytes.toBytes("qual");
2517
2518    // add data with LATEST_TIMESTAMP, put without WAL
2519    Put put = new Put(row);
2520    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2521    region.put(put);
2522
2523    // Make sure it shows up with an actual timestamp
2524    Get get = new Get(row).addColumn(fam, qual);
2525    Result result = region.get(get);
2526    assertEquals(1, result.size());
2527    Cell kv = result.rawCells()[0];
2528    LOG.info("Got: " + kv);
2529    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2530        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2531
2532    // Check same with WAL enabled (historically these took different
2533    // code paths, so check both)
2534    row = Bytes.toBytes("row2");
2535    put = new Put(row);
2536    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2537    region.put(put);
2538
2539    // Make sure it shows up with an actual timestamp
2540    get = new Get(row).addColumn(fam, qual);
2541    result = region.get(get);
2542    assertEquals(1, result.size());
2543    kv = result.rawCells()[0];
2544    LOG.info("Got: " + kv);
2545    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2546        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2547  }
2548
2549  /**
2550   * Tests that there is server-side filtering for invalid timestamp upper
2551   * bound. Note that the timestamp lower bound is automatically handled for us
2552   * by the TTL field.
2553   */
2554  @Test
2555  public void testPutWithTsSlop() throws IOException {
2556    byte[] fam = Bytes.toBytes("info");
2557    byte[][] families = { fam };
2558
2559    // add data with a timestamp that is too recent for range. Ensure assert
2560    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2561    this.region = initHRegion(tableName, method, CONF, families);
2562    boolean caughtExcep = false;
2563    try {
2564      // no TS specified == use latest. should not error
2565      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2566      // TS out of range. should error
2567      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2568          System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2569      fail("Expected IOE for TS out of configured timerange");
2570    } catch (FailedSanityCheckException ioe) {
2571      LOG.debug("Received expected exception", ioe);
2572      caughtExcep = true;
2573    }
2574    assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2575  }
2576
2577  @Test
2578  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2579    byte[] fam1 = Bytes.toBytes("columnA");
2580    byte[] fam2 = Bytes.toBytes("columnB");
2581    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2582    byte[] rowA = Bytes.toBytes("rowA");
2583    byte[] rowB = Bytes.toBytes("rowB");
2584
2585    byte[] value = Bytes.toBytes("value");
2586
2587    Delete delete = new Delete(rowA);
2588    delete.addFamily(fam1);
2589
2590    region.delete(delete);
2591
2592    // now create data.
2593    Put put = new Put(rowA);
2594    put.addColumn(fam2, null, value);
2595    region.put(put);
2596
2597    put = new Put(rowB);
2598    put.addColumn(fam1, null, value);
2599    put.addColumn(fam2, null, value);
2600    region.put(put);
2601
2602    Scan scan = new Scan();
2603    scan.addFamily(fam1).addFamily(fam2);
2604    InternalScanner s = region.getScanner(scan);
2605    List<Cell> results = new ArrayList<>();
2606    s.next(results);
2607    assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2608
2609    results.clear();
2610    s.next(results);
2611    assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2612  }
2613
2614  @Test
2615  public void testDataInMemoryWithoutWAL() throws IOException {
2616    FileSystem fs = FileSystem.get(CONF);
2617    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2618    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2619    hLog.init();
2620    // This chunk creation is done throughout the code base. Do we want to move it into core?
2621    // It is missing from this test. W/o it we NPE.
2622    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2623        COLUMN_FAMILY_BYTES);
2624
2625    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2626      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2627    final long originalSize = originalCell.getSerializedSize();
2628
2629    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2630      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2631    final long addSize = addCell.getSerializedSize();
2632
2633    LOG.info("originalSize:" + originalSize
2634      + ", addSize:" + addSize);
2635    // start test. We expect that the addPut's durability will be replaced
2636    // by originalPut's durability.
2637
2638    // case 1:
2639    testDataInMemoryWithoutWAL(region,
2640            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2641            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2642            originalSize + addSize);
2643
2644    // case 2:
2645    testDataInMemoryWithoutWAL(region,
2646            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2647            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2648            originalSize + addSize);
2649
2650    // case 3:
2651    testDataInMemoryWithoutWAL(region,
2652            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2653            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2654            0);
2655
2656    // case 4:
2657    testDataInMemoryWithoutWAL(region,
2658            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2659            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2660            0);
2661  }
2662
2663  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2664          final Put addPut, long delta) throws IOException {
2665    final long initSize = region.getDataInMemoryWithoutWAL();
2666    // save normalCPHost and replaced by mockedCPHost
2667    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2668    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2669    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2670    // do below format (from Mockito doc).
2671    Mockito.doAnswer(new Answer() {
2672      @Override
2673      public Object answer(InvocationOnMock invocation) throws Throwable {
2674        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2675        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2676        return null;
2677      }
2678    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2679    ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
2680        newBuilder(COLUMN_FAMILY_BYTES);
2681    ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
2682        Long.MAX_VALUE, region.getCellComparator());
2683    Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
2684        Mockito.any())).thenReturn(info);
2685    Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
2686        Mockito.any())).thenAnswer(i -> i.getArgument(1));
2687    region.setCoprocessorHost(mockedCPHost);
2688
2689    region.put(originalPut);
2690    region.setCoprocessorHost(normalCPHost);
2691    final long finalSize = region.getDataInMemoryWithoutWAL();
2692    assertEquals("finalSize:" + finalSize + ", initSize:"
2693      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2694  }
2695
2696  @Test
2697  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2698    Delete delete = new Delete(row);
2699    delete.addColumns(fam1, qual1);
2700    doTestDelete_AndPostInsert(delete);
2701  }
2702
2703  @Test
2704  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2705    Delete delete = new Delete(row);
2706    delete.addFamily(fam1);
2707    doTestDelete_AndPostInsert(delete);
2708  }
2709
2710  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2711    this.region = initHRegion(tableName, method, CONF, fam1);
2712    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2713    Put put = new Put(row);
2714    put.addColumn(fam1, qual1, value1);
2715    region.put(put);
2716
2717    // now delete the value:
2718    region.delete(delete);
2719
2720    // ok put data:
2721    put = new Put(row);
2722    put.addColumn(fam1, qual1, value2);
2723    region.put(put);
2724
2725    // ok get:
2726    Get get = new Get(row);
2727    get.addColumn(fam1, qual1);
2728
2729    Result r = region.get(get);
2730    assertEquals(1, r.size());
2731    assertArrayEquals(value2, r.getValue(fam1, qual1));
2732
2733    // next:
2734    Scan scan = new Scan(row);
2735    scan.addColumn(fam1, qual1);
2736    InternalScanner s = region.getScanner(scan);
2737
2738    List<Cell> results = new ArrayList<>();
2739    assertEquals(false, s.next(results));
2740    assertEquals(1, results.size());
2741    Cell kv = results.get(0);
2742
2743    assertArrayEquals(value2, CellUtil.cloneValue(kv));
2744    assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2745    assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2746    assertArrayEquals(row, CellUtil.cloneRow(kv));
2747  }
2748
2749  @Test
2750  public void testDelete_CheckTimestampUpdated() throws IOException {
2751    byte[] row1 = Bytes.toBytes("row1");
2752    byte[] col1 = Bytes.toBytes("col1");
2753    byte[] col2 = Bytes.toBytes("col2");
2754    byte[] col3 = Bytes.toBytes("col3");
2755
2756    // Setting up region
2757    this.region = initHRegion(tableName, method, CONF, fam1);
2758    // Building checkerList
2759    List<Cell> kvs = new ArrayList<>();
2760    kvs.add(new KeyValue(row1, fam1, col1, null));
2761    kvs.add(new KeyValue(row1, fam1, col2, null));
2762    kvs.add(new KeyValue(row1, fam1, col3, null));
2763
2764    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2765    deleteMap.put(fam1, kvs);
2766    region.delete(deleteMap, Durability.SYNC_WAL);
2767
2768    // extract the key values out the memstore:
2769    // This is kinda hacky, but better than nothing...
2770    long now = System.currentTimeMillis();
2771    AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2772    Cell firstCell = memstore.getActive().first();
2773    assertTrue(firstCell.getTimestamp() <= now);
2774    now = firstCell.getTimestamp();
2775    for (Cell cell : memstore.getActive().getCellSet()) {
2776      assertTrue(cell.getTimestamp() <= now);
2777      now = cell.getTimestamp();
2778    }
2779  }
2780
2781  // ////////////////////////////////////////////////////////////////////////////
2782  // Get tests
2783  // ////////////////////////////////////////////////////////////////////////////
2784  @Test
2785  public void testGet_FamilyChecker() throws IOException {
2786    byte[] row1 = Bytes.toBytes("row1");
2787    byte[] fam1 = Bytes.toBytes("fam1");
2788    byte[] fam2 = Bytes.toBytes("False");
2789    byte[] col1 = Bytes.toBytes("col1");
2790
2791    // Setting up region
2792    this.region = initHRegion(tableName, method, CONF, fam1);
2793    Get get = new Get(row1);
2794    get.addColumn(fam2, col1);
2795
2796    // Test
2797    try {
2798      region.get(get);
2799      fail("Expecting DoNotRetryIOException in get but did not get any");
2800    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2801      LOG.info("Got expected DoNotRetryIOException successfully");
2802    }
2803  }
2804
2805  @Test
2806  public void testGet_Basic() throws IOException {
2807    byte[] row1 = Bytes.toBytes("row1");
2808    byte[] fam1 = Bytes.toBytes("fam1");
2809    byte[] col1 = Bytes.toBytes("col1");
2810    byte[] col2 = Bytes.toBytes("col2");
2811    byte[] col3 = Bytes.toBytes("col3");
2812    byte[] col4 = Bytes.toBytes("col4");
2813    byte[] col5 = Bytes.toBytes("col5");
2814
2815    // Setting up region
2816    this.region = initHRegion(tableName, method, CONF, fam1);
2817    // Add to memstore
2818    Put put = new Put(row1);
2819    put.addColumn(fam1, col1, null);
2820    put.addColumn(fam1, col2, null);
2821    put.addColumn(fam1, col3, null);
2822    put.addColumn(fam1, col4, null);
2823    put.addColumn(fam1, col5, null);
2824    region.put(put);
2825
2826    Get get = new Get(row1);
2827    get.addColumn(fam1, col2);
2828    get.addColumn(fam1, col4);
2829    // Expected result
2830    KeyValue kv1 = new KeyValue(row1, fam1, col2);
2831    KeyValue kv2 = new KeyValue(row1, fam1, col4);
2832    KeyValue[] expected = { kv1, kv2 };
2833
2834    // Test
2835    Result res = region.get(get);
2836    assertEquals(expected.length, res.size());
2837    for (int i = 0; i < res.size(); i++) {
2838      assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2839      assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2840      assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2841    }
2842
2843    // Test using a filter on a Get
2844    Get g = new Get(row1);
2845    final int count = 2;
2846    g.setFilter(new ColumnCountGetFilter(count));
2847    res = region.get(g);
2848    assertEquals(count, res.size());
2849  }
2850
2851  @Test
2852  public void testGet_Empty() throws IOException {
2853    byte[] row = Bytes.toBytes("row");
2854    byte[] fam = Bytes.toBytes("fam");
2855
2856    this.region = initHRegion(tableName, method, CONF, fam);
2857    Get get = new Get(row);
2858    get.addFamily(fam);
2859    Result r = region.get(get);
2860
2861    assertTrue(r.isEmpty());
2862  }
2863
2864  @Test
2865  public void testGetWithFilter() throws IOException, InterruptedException {
2866    byte[] row1 = Bytes.toBytes("row1");
2867    byte[] fam1 = Bytes.toBytes("fam1");
2868    byte[] col1 = Bytes.toBytes("col1");
2869    byte[] value1 = Bytes.toBytes("value1");
2870    byte[] value2 = Bytes.toBytes("value2");
2871
2872    final int maxVersions = 3;
2873    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2874    hcd.setMaxVersions(maxVersions);
2875    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2876    htd.addFamily(hcd);
2877    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2878    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2879    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2880    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2881    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2882
2883    // Put 4 version to memstore
2884    long ts = 0;
2885    Put put = new Put(row1, ts);
2886    put.addColumn(fam1, col1, value1);
2887    region.put(put);
2888    put = new Put(row1, ts + 1);
2889    put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2890    region.put(put);
2891    put = new Put(row1, ts + 2);
2892    put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2893    region.put(put);
2894    put = new Put(row1, ts + 3);
2895    put.addColumn(fam1, col1, value2);
2896    region.put(put);
2897
2898    Get get = new Get(row1);
2899    get.setMaxVersions();
2900    Result res = region.get(get);
2901    // Get 3 versions, the oldest version has gone from user view
2902    assertEquals(maxVersions, res.size());
2903
2904    get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
2905    res = region.get(get);
2906    // When use value filter, the oldest version should still gone from user view and it
2907    // should only return one key vaule
2908    assertEquals(1, res.size());
2909    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2910    assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2911
2912    region.flush(true);
2913    region.compact(true);
2914    Thread.sleep(1000);
2915    res = region.get(get);
2916    // After flush and compact, the result should be consistent with previous result
2917    assertEquals(1, res.size());
2918    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2919  }
2920
2921  // ////////////////////////////////////////////////////////////////////////////
2922  // Scanner tests
2923  // ////////////////////////////////////////////////////////////////////////////
2924  @Test
2925  public void testGetScanner_WithOkFamilies() throws IOException {
2926    byte[] fam1 = Bytes.toBytes("fam1");
2927    byte[] fam2 = Bytes.toBytes("fam2");
2928
2929    byte[][] families = { fam1, fam2 };
2930
2931    // Setting up region
2932    this.region = initHRegion(tableName, method, CONF, families);
2933    Scan scan = new Scan();
2934    scan.addFamily(fam1);
2935    scan.addFamily(fam2);
2936    try {
2937      region.getScanner(scan);
2938    } catch (Exception e) {
2939      assertTrue("Families could not be found in Region", false);
2940    }
2941  }
2942
2943  @Test
2944  public void testGetScanner_WithNotOkFamilies() throws IOException {
2945    byte[] fam1 = Bytes.toBytes("fam1");
2946    byte[] fam2 = Bytes.toBytes("fam2");
2947
2948    byte[][] families = { fam1 };
2949
2950    // Setting up region
2951    this.region = initHRegion(tableName, method, CONF, families);
2952    Scan scan = new Scan();
2953    scan.addFamily(fam2);
2954    boolean ok = false;
2955    try {
2956      region.getScanner(scan);
2957    } catch (Exception e) {
2958      ok = true;
2959    }
2960    assertTrue("Families could not be found in Region", ok);
2961  }
2962
2963  @Test
2964  public void testGetScanner_WithNoFamilies() throws IOException {
2965    byte[] row1 = Bytes.toBytes("row1");
2966    byte[] fam1 = Bytes.toBytes("fam1");
2967    byte[] fam2 = Bytes.toBytes("fam2");
2968    byte[] fam3 = Bytes.toBytes("fam3");
2969    byte[] fam4 = Bytes.toBytes("fam4");
2970
2971    byte[][] families = { fam1, fam2, fam3, fam4 };
2972
2973    // Setting up region
2974    this.region = initHRegion(tableName, method, CONF, families);
2975    // Putting data in Region
2976    Put put = new Put(row1);
2977    put.addColumn(fam1, null, null);
2978    put.addColumn(fam2, null, null);
2979    put.addColumn(fam3, null, null);
2980    put.addColumn(fam4, null, null);
2981    region.put(put);
2982
2983    Scan scan = null;
2984    HRegion.RegionScannerImpl is = null;
2985
2986    // Testing to see how many scanners that is produced by getScanner,
2987    // starting
2988    // with known number, 2 - current = 1
2989    scan = new Scan();
2990    scan.addFamily(fam2);
2991    scan.addFamily(fam4);
2992    is = region.getScanner(scan);
2993    assertEquals(1, is.storeHeap.getHeap().size());
2994
2995    scan = new Scan();
2996    is = region.getScanner(scan);
2997    assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2998  }
2999
3000  /**
3001   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
3002   *
3003   * @throws IOException
3004   */
3005  @Test
3006  public void testGetScanner_WithRegionClosed() throws IOException {
3007    byte[] fam1 = Bytes.toBytes("fam1");
3008    byte[] fam2 = Bytes.toBytes("fam2");
3009
3010    byte[][] families = { fam1, fam2 };
3011
3012    // Setting up region
3013    try {
3014      this.region = initHRegion(tableName, method, CONF, families);
3015    } catch (IOException e) {
3016      e.printStackTrace();
3017      fail("Got IOException during initHRegion, " + e.getMessage());
3018    }
3019    region.closed.set(true);
3020    try {
3021      region.getScanner(null);
3022      fail("Expected to get an exception during getScanner on a region that is closed");
3023    } catch (NotServingRegionException e) {
3024      // this is the correct exception that is expected
3025    } catch (IOException e) {
3026      fail("Got wrong type of exception - should be a NotServingRegionException, " +
3027          "but was an IOException: "
3028          + e.getMessage());
3029    }
3030  }
3031
3032  @Test
3033  public void testRegionScanner_Next() throws IOException {
3034    byte[] row1 = Bytes.toBytes("row1");
3035    byte[] row2 = Bytes.toBytes("row2");
3036    byte[] fam1 = Bytes.toBytes("fam1");
3037    byte[] fam2 = Bytes.toBytes("fam2");
3038    byte[] fam3 = Bytes.toBytes("fam3");
3039    byte[] fam4 = Bytes.toBytes("fam4");
3040
3041    byte[][] families = { fam1, fam2, fam3, fam4 };
3042    long ts = System.currentTimeMillis();
3043
3044    // Setting up region
3045    this.region = initHRegion(tableName, method, CONF, families);
3046    // Putting data in Region
3047    Put put = null;
3048    put = new Put(row1);
3049    put.addColumn(fam1, (byte[]) null, ts, null);
3050    put.addColumn(fam2, (byte[]) null, ts, null);
3051    put.addColumn(fam3, (byte[]) null, ts, null);
3052    put.addColumn(fam4, (byte[]) null, ts, null);
3053    region.put(put);
3054
3055    put = new Put(row2);
3056    put.addColumn(fam1, (byte[]) null, ts, null);
3057    put.addColumn(fam2, (byte[]) null, ts, null);
3058    put.addColumn(fam3, (byte[]) null, ts, null);
3059    put.addColumn(fam4, (byte[]) null, ts, null);
3060    region.put(put);
3061
3062    Scan scan = new Scan();
3063    scan.addFamily(fam2);
3064    scan.addFamily(fam4);
3065    InternalScanner is = region.getScanner(scan);
3066
3067    List<Cell> res = null;
3068
3069    // Result 1
3070    List<Cell> expected1 = new ArrayList<>();
3071    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
3072    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
3073
3074    res = new ArrayList<>();
3075    is.next(res);
3076    for (int i = 0; i < res.size(); i++) {
3077      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
3078    }
3079
3080    // Result 2
3081    List<Cell> expected2 = new ArrayList<>();
3082    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
3083    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
3084
3085    res = new ArrayList<>();
3086    is.next(res);
3087    for (int i = 0; i < res.size(); i++) {
3088      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
3089    }
3090  }
3091
3092  @Test
3093  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
3094    byte[] row1 = Bytes.toBytes("row1");
3095    byte[] qf1 = Bytes.toBytes("qualifier1");
3096    byte[] qf2 = Bytes.toBytes("qualifier2");
3097    byte[] fam1 = Bytes.toBytes("fam1");
3098    byte[][] families = { fam1 };
3099
3100    long ts1 = System.currentTimeMillis();
3101    long ts2 = ts1 + 1;
3102    long ts3 = ts1 + 2;
3103
3104    // Setting up region
3105    this.region = initHRegion(tableName, method, CONF, families);
3106    // Putting data in Region
3107    Put put = null;
3108    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3109    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3110    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3111
3112    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3113    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3114    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3115
3116    put = new Put(row1);
3117    put.add(kv13);
3118    put.add(kv12);
3119    put.add(kv11);
3120    put.add(kv23);
3121    put.add(kv22);
3122    put.add(kv21);
3123    region.put(put);
3124
3125    // Expected
3126    List<Cell> expected = new ArrayList<>();
3127    expected.add(kv13);
3128    expected.add(kv12);
3129
3130    Scan scan = new Scan(row1);
3131    scan.addColumn(fam1, qf1);
3132    scan.setMaxVersions(MAX_VERSIONS);
3133    List<Cell> actual = new ArrayList<>();
3134    InternalScanner scanner = region.getScanner(scan);
3135
3136    boolean hasNext = scanner.next(actual);
3137    assertEquals(false, hasNext);
3138
3139    // Verify result
3140    for (int i = 0; i < expected.size(); i++) {
3141      assertEquals(expected.get(i), actual.get(i));
3142    }
3143  }
3144
3145  @Test
3146  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
3147    byte[] row1 = Bytes.toBytes("row1");
3148    byte[] qf1 = Bytes.toBytes("qualifier1");
3149    byte[] qf2 = Bytes.toBytes("qualifier2");
3150    byte[] fam1 = Bytes.toBytes("fam1");
3151    byte[][] families = { fam1 };
3152
3153    long ts1 = 1; // System.currentTimeMillis();
3154    long ts2 = ts1 + 1;
3155    long ts3 = ts1 + 2;
3156
3157    // Setting up region
3158    this.region = initHRegion(tableName, method, CONF, families);
3159    // Putting data in Region
3160    Put put = null;
3161    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3162    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3163    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3164
3165    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3166    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3167    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3168
3169    put = new Put(row1);
3170    put.add(kv13);
3171    put.add(kv12);
3172    put.add(kv11);
3173    put.add(kv23);
3174    put.add(kv22);
3175    put.add(kv21);
3176    region.put(put);
3177    region.flush(true);
3178
3179    // Expected
3180    List<Cell> expected = new ArrayList<>();
3181    expected.add(kv13);
3182    expected.add(kv12);
3183    expected.add(kv23);
3184    expected.add(kv22);
3185
3186    Scan scan = new Scan(row1);
3187    scan.addColumn(fam1, qf1);
3188    scan.addColumn(fam1, qf2);
3189    scan.setMaxVersions(MAX_VERSIONS);
3190    List<Cell> actual = new ArrayList<>();
3191    InternalScanner scanner = region.getScanner(scan);
3192
3193    boolean hasNext = scanner.next(actual);
3194    assertEquals(false, hasNext);
3195
3196    // Verify result
3197    for (int i = 0; i < expected.size(); i++) {
3198      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3199    }
3200  }
3201
3202  @Test
3203  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3204      IOException {
3205    byte[] row1 = Bytes.toBytes("row1");
3206    byte[] fam1 = Bytes.toBytes("fam1");
3207    byte[][] families = { fam1 };
3208    byte[] qf1 = Bytes.toBytes("qualifier1");
3209    byte[] qf2 = Bytes.toBytes("qualifier2");
3210
3211    long ts1 = 1;
3212    long ts2 = ts1 + 1;
3213    long ts3 = ts1 + 2;
3214    long ts4 = ts1 + 3;
3215
3216    // Setting up region
3217    this.region = initHRegion(tableName, method, CONF, families);
3218    // Putting data in Region
3219    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3220    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3221    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3222    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3223
3224    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3225    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3226    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3227    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3228
3229    Put put = null;
3230    put = new Put(row1);
3231    put.add(kv14);
3232    put.add(kv24);
3233    region.put(put);
3234    region.flush(true);
3235
3236    put = new Put(row1);
3237    put.add(kv23);
3238    put.add(kv13);
3239    region.put(put);
3240    region.flush(true);
3241
3242    put = new Put(row1);
3243    put.add(kv22);
3244    put.add(kv12);
3245    region.put(put);
3246    region.flush(true);
3247
3248    put = new Put(row1);
3249    put.add(kv21);
3250    put.add(kv11);
3251    region.put(put);
3252
3253    // Expected
3254    List<Cell> expected = new ArrayList<>();
3255    expected.add(kv14);
3256    expected.add(kv13);
3257    expected.add(kv12);
3258    expected.add(kv24);
3259    expected.add(kv23);
3260    expected.add(kv22);
3261
3262    Scan scan = new Scan(row1);
3263    scan.addColumn(fam1, qf1);
3264    scan.addColumn(fam1, qf2);
3265    int versions = 3;
3266    scan.setMaxVersions(versions);
3267    List<Cell> actual = new ArrayList<>();
3268    InternalScanner scanner = region.getScanner(scan);
3269
3270    boolean hasNext = scanner.next(actual);
3271    assertEquals(false, hasNext);
3272
3273    // Verify result
3274    for (int i = 0; i < expected.size(); i++) {
3275      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3276    }
3277  }
3278
3279  @Test
3280  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3281    byte[] row1 = Bytes.toBytes("row1");
3282    byte[] qf1 = Bytes.toBytes("qualifier1");
3283    byte[] qf2 = Bytes.toBytes("qualifier2");
3284    byte[] fam1 = Bytes.toBytes("fam1");
3285    byte[][] families = { fam1 };
3286
3287    long ts1 = System.currentTimeMillis();
3288    long ts2 = ts1 + 1;
3289    long ts3 = ts1 + 2;
3290
3291    // Setting up region
3292    this.region = initHRegion(tableName, method, CONF, families);
3293    // Putting data in Region
3294    Put put = null;
3295    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3296    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3297    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3298
3299    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3300    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3301    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3302
3303    put = new Put(row1);
3304    put.add(kv13);
3305    put.add(kv12);
3306    put.add(kv11);
3307    put.add(kv23);
3308    put.add(kv22);
3309    put.add(kv21);
3310    region.put(put);
3311
3312    // Expected
3313    List<Cell> expected = new ArrayList<>();
3314    expected.add(kv13);
3315    expected.add(kv12);
3316    expected.add(kv23);
3317    expected.add(kv22);
3318
3319    Scan scan = new Scan(row1);
3320    scan.addFamily(fam1);
3321    scan.setMaxVersions(MAX_VERSIONS);
3322    List<Cell> actual = new ArrayList<>();
3323    InternalScanner scanner = region.getScanner(scan);
3324
3325    boolean hasNext = scanner.next(actual);
3326    assertEquals(false, hasNext);
3327
3328    // Verify result
3329    for (int i = 0; i < expected.size(); i++) {
3330      assertEquals(expected.get(i), actual.get(i));
3331    }
3332  }
3333
3334  @Test
3335  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3336    byte[] row1 = Bytes.toBytes("row1");
3337    byte[] qf1 = Bytes.toBytes("qualifier1");
3338    byte[] qf2 = Bytes.toBytes("qualifier2");
3339    byte[] fam1 = Bytes.toBytes("fam1");
3340
3341    long ts1 = 1; // System.currentTimeMillis();
3342    long ts2 = ts1 + 1;
3343    long ts3 = ts1 + 2;
3344
3345    // Setting up region
3346    this.region = initHRegion(tableName, method, CONF, fam1);
3347    // Putting data in Region
3348    Put put = null;
3349    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3350    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3351    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3352
3353    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3354    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3355    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3356
3357    put = new Put(row1);
3358    put.add(kv13);
3359    put.add(kv12);
3360    put.add(kv11);
3361    put.add(kv23);
3362    put.add(kv22);
3363    put.add(kv21);
3364    region.put(put);
3365    region.flush(true);
3366
3367    // Expected
3368    List<Cell> expected = new ArrayList<>();
3369    expected.add(kv13);
3370    expected.add(kv12);
3371    expected.add(kv23);
3372    expected.add(kv22);
3373
3374    Scan scan = new Scan(row1);
3375    scan.addFamily(fam1);
3376    scan.setMaxVersions(MAX_VERSIONS);
3377    List<Cell> actual = new ArrayList<>();
3378    InternalScanner scanner = region.getScanner(scan);
3379
3380    boolean hasNext = scanner.next(actual);
3381    assertEquals(false, hasNext);
3382
3383    // Verify result
3384    for (int i = 0; i < expected.size(); i++) {
3385      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3386    }
3387  }
3388
3389  @Test
3390  public void testScanner_StopRow1542() throws IOException {
3391    byte[] family = Bytes.toBytes("testFamily");
3392    this.region = initHRegion(tableName, method, CONF, family);
3393    byte[] row1 = Bytes.toBytes("row111");
3394    byte[] row2 = Bytes.toBytes("row222");
3395    byte[] row3 = Bytes.toBytes("row333");
3396    byte[] row4 = Bytes.toBytes("row444");
3397    byte[] row5 = Bytes.toBytes("row555");
3398
3399    byte[] col1 = Bytes.toBytes("Pub111");
3400    byte[] col2 = Bytes.toBytes("Pub222");
3401
3402    Put put = new Put(row1);
3403    put.addColumn(family, col1, Bytes.toBytes(10L));
3404    region.put(put);
3405
3406    put = new Put(row2);
3407    put.addColumn(family, col1, Bytes.toBytes(15L));
3408    region.put(put);
3409
3410    put = new Put(row3);
3411    put.addColumn(family, col2, Bytes.toBytes(20L));
3412    region.put(put);
3413
3414    put = new Put(row4);
3415    put.addColumn(family, col2, Bytes.toBytes(30L));
3416    region.put(put);
3417
3418    put = new Put(row5);
3419    put.addColumn(family, col1, Bytes.toBytes(40L));
3420    region.put(put);
3421
3422    Scan scan = new Scan(row3, row4);
3423    scan.setMaxVersions();
3424    scan.addColumn(family, col1);
3425    InternalScanner s = region.getScanner(scan);
3426
3427    List<Cell> results = new ArrayList<>();
3428    assertEquals(false, s.next(results));
3429    assertEquals(0, results.size());
3430  }
3431
3432  @Test
3433  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3434    byte[] row1 = Bytes.toBytes("row1");
3435    byte[] fam1 = Bytes.toBytes("fam1");
3436    byte[] qf1 = Bytes.toBytes("qualifier1");
3437    byte[] qf2 = Bytes.toBytes("quateslifier2");
3438
3439    long ts1 = 1;
3440    long ts2 = ts1 + 1;
3441    long ts3 = ts1 + 2;
3442    long ts4 = ts1 + 3;
3443
3444    // Setting up region
3445    this.region = initHRegion(tableName, method, CONF, fam1);
3446    // Putting data in Region
3447    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3448    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3449    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3450    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3451
3452    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3453    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3454    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3455    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3456
3457    Put put = null;
3458    put = new Put(row1);
3459    put.add(kv14);
3460    put.add(kv24);
3461    region.put(put);
3462    region.flush(true);
3463
3464    put = new Put(row1);
3465    put.add(kv23);
3466    put.add(kv13);
3467    region.put(put);
3468    region.flush(true);
3469
3470    put = new Put(row1);
3471    put.add(kv22);
3472    put.add(kv12);
3473    region.put(put);
3474    region.flush(true);
3475
3476    put = new Put(row1);
3477    put.add(kv21);
3478    put.add(kv11);
3479    region.put(put);
3480
3481    // Expected
3482    List<KeyValue> expected = new ArrayList<>();
3483    expected.add(kv14);
3484    expected.add(kv13);
3485    expected.add(kv12);
3486    expected.add(kv24);
3487    expected.add(kv23);
3488    expected.add(kv22);
3489
3490    Scan scan = new Scan(row1);
3491    int versions = 3;
3492    scan.setMaxVersions(versions);
3493    List<Cell> actual = new ArrayList<>();
3494    InternalScanner scanner = region.getScanner(scan);
3495
3496    boolean hasNext = scanner.next(actual);
3497    assertEquals(false, hasNext);
3498
3499    // Verify result
3500    for (int i = 0; i < expected.size(); i++) {
3501      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3502    }
3503  }
3504
3505  /**
3506   * Added for HBASE-5416
3507   *
3508   * Here we test scan optimization when only subset of CFs are used in filter
3509   * conditions.
3510   */
3511  @Test
3512  public void testScanner_JoinedScanners() throws IOException {
3513    byte[] cf_essential = Bytes.toBytes("essential");
3514    byte[] cf_joined = Bytes.toBytes("joined");
3515    byte[] cf_alpha = Bytes.toBytes("alpha");
3516    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3517    byte[] row1 = Bytes.toBytes("row1");
3518    byte[] row2 = Bytes.toBytes("row2");
3519    byte[] row3 = Bytes.toBytes("row3");
3520
3521    byte[] col_normal = Bytes.toBytes("d");
3522    byte[] col_alpha = Bytes.toBytes("a");
3523
3524    byte[] filtered_val = Bytes.toBytes(3);
3525
3526    Put put = new Put(row1);
3527    put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3528    put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3529    region.put(put);
3530
3531    put = new Put(row2);
3532    put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3533    put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3534    put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3535    region.put(put);
3536
3537    put = new Put(row3);
3538    put.addColumn(cf_essential, col_normal, filtered_val);
3539    put.addColumn(cf_joined, col_normal, filtered_val);
3540    region.put(put);
3541
3542    // Check two things:
3543    // 1. result list contains expected values
3544    // 2. result list is sorted properly
3545
3546    Scan scan = new Scan();
3547    Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3548        CompareOp.NOT_EQUAL, filtered_val);
3549    scan.setFilter(filter);
3550    scan.setLoadColumnFamiliesOnDemand(true);
3551    InternalScanner s = region.getScanner(scan);
3552
3553    List<Cell> results = new ArrayList<>();
3554    assertTrue(s.next(results));
3555    assertEquals(1, results.size());
3556    results.clear();
3557
3558    assertTrue(s.next(results));
3559    assertEquals(3, results.size());
3560    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3561    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3562    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3563    results.clear();
3564
3565    assertFalse(s.next(results));
3566    assertEquals(0, results.size());
3567  }
3568
3569  /**
3570   * HBASE-5416
3571   *
3572   * Test case when scan limits amount of KVs returned on each next() call.
3573   */
3574  @Test
3575  public void testScanner_JoinedScannersWithLimits() throws IOException {
3576    final byte[] cf_first = Bytes.toBytes("first");
3577    final byte[] cf_second = Bytes.toBytes("second");
3578
3579    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3580    final byte[] col_a = Bytes.toBytes("a");
3581    final byte[] col_b = Bytes.toBytes("b");
3582
3583    Put put;
3584
3585    for (int i = 0; i < 10; i++) {
3586      put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3587      put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3588      if (i < 5) {
3589        put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3590        put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3591        put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3592      }
3593      region.put(put);
3594    }
3595
3596    Scan scan = new Scan();
3597    scan.setLoadColumnFamiliesOnDemand(true);
3598    Filter bogusFilter = new FilterBase() {
3599      @Override
3600      public ReturnCode filterCell(final Cell ignored) throws IOException {
3601        return ReturnCode.INCLUDE;
3602      }
3603      @Override
3604      public boolean isFamilyEssential(byte[] name) {
3605        return Bytes.equals(name, cf_first);
3606      }
3607    };
3608
3609    scan.setFilter(bogusFilter);
3610    InternalScanner s = region.getScanner(scan);
3611
3612    // Our data looks like this:
3613    // r0: first:a, first:b, second:a, second:b
3614    // r1: first:a, first:b, second:a, second:b
3615    // r2: first:a, first:b, second:a, second:b
3616    // r3: first:a, first:b, second:a, second:b
3617    // r4: first:a, first:b, second:a, second:b
3618    // r5: first:a
3619    // r6: first:a
3620    // r7: first:a
3621    // r8: first:a
3622    // r9: first:a
3623
3624    // But due to next's limit set to 3, we should get this:
3625    // r0: first:a, first:b, second:a
3626    // r0: second:b
3627    // r1: first:a, first:b, second:a
3628    // r1: second:b
3629    // r2: first:a, first:b, second:a
3630    // r2: second:b
3631    // r3: first:a, first:b, second:a
3632    // r3: second:b
3633    // r4: first:a, first:b, second:a
3634    // r4: second:b
3635    // r5: first:a
3636    // r6: first:a
3637    // r7: first:a
3638    // r8: first:a
3639    // r9: first:a
3640
3641    List<Cell> results = new ArrayList<>();
3642    int index = 0;
3643    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3644    while (true) {
3645      boolean more = s.next(results, scannerContext);
3646      if ((index >> 1) < 5) {
3647        if (index % 2 == 0) {
3648          assertEquals(3, results.size());
3649        } else {
3650          assertEquals(1, results.size());
3651        }
3652      } else {
3653        assertEquals(1, results.size());
3654      }
3655      results.clear();
3656      index++;
3657      if (!more) {
3658        break;
3659      }
3660    }
3661  }
3662
3663  /**
3664   * Write an HFile block full with Cells whose qualifier that are identical between
3665   * 0 and Short.MAX_VALUE. See HBASE-13329.
3666   * @throws Exception
3667   */
3668  @Test
3669  public void testLongQualifier() throws Exception {
3670    byte[] family = Bytes.toBytes("family");
3671    this.region = initHRegion(tableName, method, CONF, family);
3672    byte[] q = new byte[Short.MAX_VALUE+2];
3673    Arrays.fill(q, 0, q.length-1, (byte)42);
3674    for (byte i=0; i<10; i++) {
3675      Put p = new Put(Bytes.toBytes("row"));
3676      // qualifiers that differ past Short.MAX_VALUE
3677      q[q.length-1]=i;
3678      p.addColumn(family, q, q);
3679      region.put(p);
3680    }
3681    region.flush(false);
3682  }
3683
3684  /**
3685   * Flushes the cache in a thread while scanning. The tests verify that the
3686   * scan is coherent - e.g. the returned results are always of the same or
3687   * later update as the previous results.
3688   *
3689   * @throws IOException
3690   *           scan / compact
3691   * @throws InterruptedException
3692   *           thread join
3693   */
3694  @Test
3695  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3696    byte[] family = Bytes.toBytes("family");
3697    int numRows = 1000;
3698    int flushAndScanInterval = 10;
3699    int compactInterval = 10 * flushAndScanInterval;
3700
3701    this.region = initHRegion(tableName, method, CONF, family);
3702    FlushThread flushThread = new FlushThread();
3703    try {
3704      flushThread.start();
3705
3706      Scan scan = new Scan();
3707      scan.addFamily(family);
3708      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3709          new BinaryComparator(Bytes.toBytes(5L))));
3710
3711      int expectedCount = 0;
3712      List<Cell> res = new ArrayList<>();
3713
3714      boolean toggle = true;
3715      for (long i = 0; i < numRows; i++) {
3716        Put put = new Put(Bytes.toBytes(i));
3717        put.setDurability(Durability.SKIP_WAL);
3718        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3719        region.put(put);
3720
3721        if (i != 0 && i % compactInterval == 0) {
3722          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3723          region.compact(true);
3724        }
3725
3726        if (i % 10 == 5L) {
3727          expectedCount++;
3728        }
3729
3730        if (i != 0 && i % flushAndScanInterval == 0) {
3731          res.clear();
3732          InternalScanner scanner = region.getScanner(scan);
3733          if (toggle) {
3734            flushThread.flush();
3735          }
3736          while (scanner.next(res))
3737            ;
3738          if (!toggle) {
3739            flushThread.flush();
3740          }
3741          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3742              expectedCount, res.size());
3743          toggle = !toggle;
3744        }
3745      }
3746
3747    } finally {
3748      try {
3749        flushThread.done();
3750        flushThread.join();
3751        flushThread.checkNoError();
3752      } catch (InterruptedException ie) {
3753        LOG.warn("Caught exception when joining with flushThread", ie);
3754      }
3755      HBaseTestingUtility.closeRegionAndWAL(this.region);
3756      this.region = null;
3757    }
3758  }
3759
3760  protected class FlushThread extends Thread {
3761    private volatile boolean done;
3762    private Throwable error = null;
3763
3764    FlushThread() {
3765      super("FlushThread");
3766    }
3767
3768    public void done() {
3769      done = true;
3770      synchronized (this) {
3771        interrupt();
3772      }
3773    }
3774
3775    public void checkNoError() {
3776      if (error != null) {
3777        assertNull(error);
3778      }
3779    }
3780
3781    @Override
3782    public void run() {
3783      done = false;
3784      while (!done) {
3785        synchronized (this) {
3786          try {
3787            wait();
3788          } catch (InterruptedException ignored) {
3789            if (done) {
3790              break;
3791            }
3792          }
3793        }
3794        try {
3795          region.flush(true);
3796        } catch (IOException e) {
3797          if (!done) {
3798            LOG.error("Error while flushing cache", e);
3799            error = e;
3800          }
3801          break;
3802        } catch (Throwable t) {
3803          LOG.error("Uncaught exception", t);
3804          throw t;
3805        }
3806      }
3807    }
3808
3809    public void flush() {
3810      synchronized (this) {
3811        notify();
3812      }
3813    }
3814  }
3815
3816  /**
3817   * So can be overridden in subclasses.
3818   */
3819  int getNumQualifiersForTestWritesWhileScanning() {
3820    return 100;
3821  }
3822
3823  /**
3824   * So can be overridden in subclasses.
3825   */
3826  int getTestCountForTestWritesWhileScanning() {
3827    return 100;
3828  }
3829
3830  /**
3831   * Writes very wide records and scans for the latest every time.. Flushes and
3832   * compacts the region every now and then to keep things realistic.
3833   *
3834   * @throws IOException
3835   *           by flush / scan / compaction
3836   * @throws InterruptedException
3837   *           when joining threads
3838   */
3839  @Test
3840  public void testWritesWhileScanning() throws IOException, InterruptedException {
3841    int testCount = getTestCountForTestWritesWhileScanning();
3842    int numRows = 1;
3843    int numFamilies = 10;
3844    int numQualifiers = getNumQualifiersForTestWritesWhileScanning();
3845    int flushInterval = 7;
3846    int compactInterval = 5 * flushInterval;
3847    byte[][] families = new byte[numFamilies][];
3848    for (int i = 0; i < numFamilies; i++) {
3849      families[i] = Bytes.toBytes("family" + i);
3850    }
3851    byte[][] qualifiers = new byte[numQualifiers][];
3852    for (int i = 0; i < numQualifiers; i++) {
3853      qualifiers[i] = Bytes.toBytes("qual" + i);
3854    }
3855
3856    this.region = initHRegion(tableName, method, CONF, families);
3857    FlushThread flushThread = new FlushThread();
3858    PutThread putThread = new PutThread(numRows, families, qualifiers);
3859    try {
3860      putThread.start();
3861      putThread.waitForFirstPut();
3862
3863      flushThread.start();
3864
3865      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3866
3867      int expectedCount = numFamilies * numQualifiers;
3868      List<Cell> res = new ArrayList<>();
3869
3870      long prevTimestamp = 0L;
3871      for (int i = 0; i < testCount; i++) {
3872
3873        if (i != 0 && i % compactInterval == 0) {
3874          region.compact(true);
3875          for (HStore store : region.getStores()) {
3876            store.closeAndArchiveCompactedFiles();
3877          }
3878        }
3879
3880        if (i != 0 && i % flushInterval == 0) {
3881          flushThread.flush();
3882        }
3883
3884        boolean previousEmpty = res.isEmpty();
3885        res.clear();
3886        InternalScanner scanner = region.getScanner(scan);
3887        while (scanner.next(res))
3888          ;
3889        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3890          assertEquals("i=" + i, expectedCount, res.size());
3891          long timestamp = res.get(0).getTimestamp();
3892          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3893              timestamp >= prevTimestamp);
3894          prevTimestamp = timestamp;
3895        }
3896      }
3897
3898      putThread.done();
3899
3900      region.flush(true);
3901
3902    } finally {
3903      try {
3904        flushThread.done();
3905        flushThread.join();
3906        flushThread.checkNoError();
3907
3908        putThread.join();
3909        putThread.checkNoError();
3910      } catch (InterruptedException ie) {
3911        LOG.warn("Caught exception when joining with flushThread", ie);
3912      }
3913
3914      try {
3915          HBaseTestingUtility.closeRegionAndWAL(this.region);
3916      } catch (DroppedSnapshotException dse) {
3917        // We could get this on way out because we interrupt the background flusher and it could
3918        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3919        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3920        // flush because the accounting is 'off' since original DSE.
3921      }
3922      this.region = null;
3923    }
3924  }
3925
3926  protected class PutThread extends Thread {
3927    private volatile boolean done;
3928    private volatile int numPutsFinished = 0;
3929
3930    private Throwable error = null;
3931    private int numRows;
3932    private byte[][] families;
3933    private byte[][] qualifiers;
3934
3935    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3936      super("PutThread");
3937      this.numRows = numRows;
3938      this.families = families;
3939      this.qualifiers = qualifiers;
3940    }
3941
3942    /**
3943     * Block calling thread until this instance of PutThread has put at least one row.
3944     */
3945    public void waitForFirstPut() throws InterruptedException {
3946      // wait until put thread actually puts some data
3947      while (isAlive() && numPutsFinished == 0) {
3948        checkNoError();
3949        Thread.sleep(50);
3950      }
3951    }
3952
3953    public void done() {
3954      done = true;
3955      synchronized (this) {
3956        interrupt();
3957      }
3958    }
3959
3960    public void checkNoError() {
3961      if (error != null) {
3962        assertNull(error);
3963      }
3964    }
3965
3966    @Override
3967    public void run() {
3968      done = false;
3969      while (!done) {
3970        try {
3971          for (int r = 0; r < numRows; r++) {
3972            byte[] row = Bytes.toBytes("row" + r);
3973            Put put = new Put(row);
3974            put.setDurability(Durability.SKIP_WAL);
3975            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3976            for (byte[] family : families) {
3977              for (byte[] qualifier : qualifiers) {
3978                put.addColumn(family, qualifier, numPutsFinished, value);
3979              }
3980            }
3981            region.put(put);
3982            numPutsFinished++;
3983            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3984              System.out.println("put iteration = " + numPutsFinished);
3985              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3986              region.delete(delete);
3987            }
3988            numPutsFinished++;
3989          }
3990        } catch (InterruptedIOException e) {
3991          // This is fine. It means we are done, or didn't get the lock on time
3992          LOG.info("Interrupted", e);
3993        } catch (IOException e) {
3994          LOG.error("Error while putting records", e);
3995          error = e;
3996          break;
3997        }
3998      }
3999
4000    }
4001
4002  }
4003
4004  /**
4005   * Writes very wide records and gets the latest row every time.. Flushes and
4006   * compacts the region aggressivly to catch issues.
4007   *
4008   * @throws IOException
4009   *           by flush / scan / compaction
4010   * @throws InterruptedException
4011   *           when joining threads
4012   */
4013  @Test
4014  public void testWritesWhileGetting() throws Exception {
4015    int testCount = 50;
4016    int numRows = 1;
4017    int numFamilies = 10;
4018    int numQualifiers = 100;
4019    int compactInterval = 100;
4020    byte[][] families = new byte[numFamilies][];
4021    for (int i = 0; i < numFamilies; i++) {
4022      families[i] = Bytes.toBytes("family" + i);
4023    }
4024    byte[][] qualifiers = new byte[numQualifiers][];
4025    for (int i = 0; i < numQualifiers; i++) {
4026      qualifiers[i] = Bytes.toBytes("qual" + i);
4027    }
4028
4029
4030    // This test flushes constantly and can cause many files to be created,
4031    // possibly
4032    // extending over the ulimit. Make sure compactions are aggressive in
4033    // reducing
4034    // the number of HFiles created.
4035    Configuration conf = HBaseConfiguration.create(CONF);
4036    conf.setInt("hbase.hstore.compaction.min", 1);
4037    conf.setInt("hbase.hstore.compaction.max", 1000);
4038    this.region = initHRegion(tableName, method, conf, families);
4039    PutThread putThread = null;
4040    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
4041    try {
4042      putThread = new PutThread(numRows, families, qualifiers);
4043      putThread.start();
4044      putThread.waitForFirstPut();
4045
4046      // Add a thread that flushes as fast as possible
4047      ctx.addThread(new RepeatingTestThread(ctx) {
4048
4049        @Override
4050        public void doAnAction() throws Exception {
4051          region.flush(true);
4052          // Compact regularly to avoid creating too many files and exceeding
4053          // the ulimit.
4054          region.compact(false);
4055          for (HStore store : region.getStores()) {
4056            store.closeAndArchiveCompactedFiles();
4057          }
4058        }
4059      });
4060      ctx.startThreads();
4061
4062      Get get = new Get(Bytes.toBytes("row0"));
4063      Result result = null;
4064
4065      int expectedCount = numFamilies * numQualifiers;
4066
4067      long prevTimestamp = 0L;
4068      for (int i = 0; i < testCount; i++) {
4069        LOG.info("testWritesWhileGetting verify turn " + i);
4070        boolean previousEmpty = result == null || result.isEmpty();
4071        result = region.get(get);
4072        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4073          assertEquals("i=" + i, expectedCount, result.size());
4074          // TODO this was removed, now what dangit?!
4075          // search looking for the qualifier in question?
4076          long timestamp = 0;
4077          for (Cell kv : result.rawCells()) {
4078            if (CellUtil.matchingFamily(kv, families[0])
4079                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4080              timestamp = kv.getTimestamp();
4081            }
4082          }
4083          assertTrue(timestamp >= prevTimestamp);
4084          prevTimestamp = timestamp;
4085          Cell previousKV = null;
4086
4087          for (Cell kv : result.rawCells()) {
4088            byte[] thisValue = CellUtil.cloneValue(kv);
4089            if (previousKV != null) {
4090              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4091                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4092                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
4093                    + "(memStoreTS:" + kv.getSequenceId() + ")");
4094                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4095              }
4096            }
4097            previousKV = kv;
4098          }
4099        }
4100      }
4101    } finally {
4102      if (putThread != null)
4103        putThread.done();
4104
4105      region.flush(true);
4106
4107      if (putThread != null) {
4108        putThread.join();
4109        putThread.checkNoError();
4110      }
4111
4112      ctx.stop();
4113      HBaseTestingUtility.closeRegionAndWAL(this.region);
4114      this.region = null;
4115    }
4116  }
4117
4118  @Test
4119  public void testHolesInMeta() throws Exception {
4120    byte[] family = Bytes.toBytes("family");
4121    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4122        false, family);
4123    byte[] rowNotServed = Bytes.toBytes("a");
4124    Get g = new Get(rowNotServed);
4125    try {
4126      region.get(g);
4127      fail();
4128    } catch (WrongRegionException x) {
4129      // OK
4130    }
4131    byte[] row = Bytes.toBytes("y");
4132    g = new Get(row);
4133    region.get(g);
4134  }
4135
4136  @Test
4137  public void testIndexesScanWithOneDeletedRow() throws IOException {
4138    byte[] family = Bytes.toBytes("family");
4139
4140    // Setting up region
4141    this.region = initHRegion(tableName, method, CONF, family);
4142    Put put = new Put(Bytes.toBytes(1L));
4143    put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
4144    region.put(put);
4145
4146    region.flush(true);
4147
4148    Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4149    region.delete(delete);
4150
4151    put = new Put(Bytes.toBytes(2L));
4152    put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
4153    region.put(put);
4154
4155    Scan idxScan = new Scan();
4156    idxScan.addFamily(family);
4157    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4158        new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
4159            new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4160            CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4161    InternalScanner scanner = region.getScanner(idxScan);
4162    List<Cell> res = new ArrayList<>();
4163
4164    while (scanner.next(res)) {
4165      // Ignore res value.
4166    }
4167    assertEquals(1L, res.size());
4168  }
4169
4170  // ////////////////////////////////////////////////////////////////////////////
4171  // Bloom filter test
4172  // ////////////////////////////////////////////////////////////////////////////
4173  @Test
4174  public void testBloomFilterSize() throws IOException {
4175    byte[] fam1 = Bytes.toBytes("fam1");
4176    byte[] qf1 = Bytes.toBytes("col");
4177    byte[] val1 = Bytes.toBytes("value1");
4178    // Create Table
4179    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
4180        .setBloomFilterType(BloomType.ROWCOL);
4181
4182    HTableDescriptor htd = new HTableDescriptor(tableName);
4183    htd.addFamily(hcd);
4184    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4185    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4186    int num_unique_rows = 10;
4187    int duplicate_multiplier = 2;
4188    int num_storefiles = 4;
4189
4190    int version = 0;
4191    for (int f = 0; f < num_storefiles; f++) {
4192      for (int i = 0; i < duplicate_multiplier; i++) {
4193        for (int j = 0; j < num_unique_rows; j++) {
4194          Put put = new Put(Bytes.toBytes("row" + j));
4195          put.setDurability(Durability.SKIP_WAL);
4196          long ts = version++;
4197          put.addColumn(fam1, qf1, ts, val1);
4198          region.put(put);
4199        }
4200      }
4201      region.flush(true);
4202    }
4203    // before compaction
4204    HStore store = region.getStore(fam1);
4205    Collection<HStoreFile> storeFiles = store.getStorefiles();
4206    for (HStoreFile storefile : storeFiles) {
4207      StoreFileReader reader = storefile.getReader();
4208      reader.loadFileInfo();
4209      reader.loadBloomfilter();
4210      assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4211      assertEquals(num_unique_rows, reader.getFilterEntries());
4212    }
4213
4214    region.compact(true);
4215
4216    // after compaction
4217    storeFiles = store.getStorefiles();
4218    for (HStoreFile storefile : storeFiles) {
4219      StoreFileReader reader = storefile.getReader();
4220      reader.loadFileInfo();
4221      reader.loadBloomfilter();
4222      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4223      assertEquals(num_unique_rows, reader.getFilterEntries());
4224    }
4225  }
4226
4227  @Test
4228  public void testAllColumnsWithBloomFilter() throws IOException {
4229    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4230    byte[] FAMILY = Bytes.toBytes("family");
4231
4232    // Create table
4233    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4234        .setBloomFilterType(BloomType.ROWCOL);
4235    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4236    htd.addFamily(hcd);
4237    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4238    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4239    // For row:0, col:0: insert versions 1 through 5.
4240    byte[] row = Bytes.toBytes("row:" + 0);
4241    byte[] column = Bytes.toBytes("column:" + 0);
4242    Put put = new Put(row);
4243    put.setDurability(Durability.SKIP_WAL);
4244    for (long idx = 1; idx <= 4; idx++) {
4245      put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4246    }
4247    region.put(put);
4248
4249    // Flush
4250    region.flush(true);
4251
4252    // Get rows
4253    Get get = new Get(row);
4254    get.setMaxVersions();
4255    Cell[] kvs = region.get(get).rawCells();
4256
4257    // Check if rows are correct
4258    assertEquals(4, kvs.length);
4259    checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4260    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4261    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4262    checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4263  }
4264
4265  /**
4266   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4267   * issuing delete row on columns with bloom filter set to row+col
4268   * (BloomType.ROWCOL)
4269   */
4270  @Test
4271  public void testDeleteRowWithBloomFilter() throws IOException {
4272    byte[] familyName = Bytes.toBytes("familyName");
4273
4274    // Create Table
4275    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4276        .setBloomFilterType(BloomType.ROWCOL);
4277
4278    HTableDescriptor htd = new HTableDescriptor(tableName);
4279    htd.addFamily(hcd);
4280    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4281    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4282    // Insert some data
4283    byte[] row = Bytes.toBytes("row1");
4284    byte[] col = Bytes.toBytes("col1");
4285
4286    Put put = new Put(row);
4287    put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4288    region.put(put);
4289    region.flush(true);
4290
4291    Delete del = new Delete(row);
4292    region.delete(del);
4293    region.flush(true);
4294
4295    // Get remaining rows (should have none)
4296    Get get = new Get(row);
4297    get.addColumn(familyName, col);
4298
4299    Cell[] keyValues = region.get(get).rawCells();
4300    assertEquals(0, keyValues.length);
4301  }
4302
4303  @Test
4304  public void testgetHDFSBlocksDistribution() throws Exception {
4305    HBaseTestingUtility htu = new HBaseTestingUtility();
4306    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4307    // break up the file in to more pieces that can be distributed across the three nodes and we
4308    // won't be able to have the condition this test asserts; that at least one node has
4309    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4310    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4311    // final int DEFAULT_BLOCK_SIZE = 1024;
4312    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4313    htu.getConfiguration().setInt("dfs.replication", 2);
4314
4315    // set up a cluster with 3 nodes
4316    MiniHBaseCluster cluster = null;
4317    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4318    int regionServersCount = 3;
4319
4320    try {
4321      StartMiniClusterOption option = StartMiniClusterOption.builder()
4322          .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build();
4323      cluster = htu.startMiniCluster(option);
4324      byte[][] families = { fam1, fam2 };
4325      Table ht = htu.createTable(tableName, families);
4326
4327      // Setting up region
4328      byte row[] = Bytes.toBytes("row1");
4329      byte col[] = Bytes.toBytes("col1");
4330
4331      Put put = new Put(row);
4332      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4333      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4334      ht.put(put);
4335
4336      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4337      firstRegion.flush(true);
4338      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4339
4340      // Given the default replication factor is 2 and we have 2 HFiles,
4341      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4342      // must be at least one host that have replica for 2 HFiles. That host's
4343      // weight will be equal to the unique block weight.
4344      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4345      StringBuilder sb = new StringBuilder();
4346      for (String host: blocksDistribution1.getTopHosts()) {
4347        if (sb.length() > 0) sb.append(", ");
4348        sb.append(host);
4349        sb.append("=");
4350        sb.append(blocksDistribution1.getWeight(host));
4351      }
4352
4353      String topHost = blocksDistribution1.getTopHosts().get(0);
4354      long topHostWeight = blocksDistribution1.getWeight(topHost);
4355      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4356        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4357      LOG.info(msg);
4358      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4359
4360      // use the static method to compute the value, it should be the same.
4361      // static method is used by load balancer or other components
4362      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4363          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4364      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4365
4366      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4367
4368      ht.close();
4369    } finally {
4370      if (cluster != null) {
4371        htu.shutdownMiniCluster();
4372      }
4373    }
4374  }
4375
4376  /**
4377   * Testcase to check state of region initialization task set to ABORTED or not
4378   * if any exceptions during initialization
4379   *
4380   * @throws Exception
4381   */
4382  @Test
4383  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4384    HRegionInfo info;
4385    try {
4386      FileSystem fs = Mockito.mock(FileSystem.class);
4387      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4388      HTableDescriptor htd = new HTableDescriptor(tableName);
4389      htd.addFamily(new HColumnDescriptor("cf"));
4390      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4391          HConstants.EMPTY_BYTE_ARRAY, false);
4392      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4393      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4394      // region initialization throws IOException and set task state to ABORTED.
4395      region.initialize();
4396      fail("Region initialization should fail due to IOException");
4397    } catch (IOException io) {
4398      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4399      for (MonitoredTask monitoredTask : tasks) {
4400        if (!(monitoredTask instanceof MonitoredRPCHandler)
4401            && monitoredTask.getDescription().contains(region.toString())) {
4402          assertTrue("Region state should be ABORTED.",
4403              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4404          break;
4405        }
4406      }
4407    }
4408  }
4409
4410  /**
4411   * Verifies that the .regioninfo file is written on region creation and that
4412   * is recreated if missing during region opening.
4413   */
4414  @Test
4415  public void testRegionInfoFileCreation() throws IOException {
4416    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4417
4418    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4419    htd.addFamily(new HColumnDescriptor("cf"));
4420
4421    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4422
4423    // Create a region and skip the initialization (like CreateTableHandler)
4424    region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4425    Path regionDir = region.getRegionFileSystem().getRegionDir();
4426    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4427    HBaseTestingUtility.closeRegionAndWAL(region);
4428
4429    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4430
4431    // Verify that the .regioninfo file is present
4432    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4433        fs.exists(regionInfoFile));
4434
4435    // Try to open the region
4436    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4437    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4438    HBaseTestingUtility.closeRegionAndWAL(region);
4439
4440    // Verify that the .regioninfo file is still there
4441    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4442        fs.exists(regionInfoFile));
4443
4444    // Remove the .regioninfo file and verify is recreated on region open
4445    fs.delete(regionInfoFile, true);
4446    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4447        fs.exists(regionInfoFile));
4448
4449    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4450//    region = TEST_UTIL.openHRegion(hri, htd);
4451    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4452    HBaseTestingUtility.closeRegionAndWAL(region);
4453
4454    // Verify that the .regioninfo file is still there
4455    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4456        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4457
4458    region = null;
4459  }
4460
4461  /**
4462   * TestCase for increment
4463   */
4464  private static class Incrementer implements Runnable {
4465    private HRegion region;
4466    private final static byte[] incRow = Bytes.toBytes("incRow");
4467    private final static byte[] family = Bytes.toBytes("family");
4468    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4469    private final static long ONE = 1L;
4470    private int incCounter;
4471
4472    public Incrementer(HRegion region, int incCounter) {
4473      this.region = region;
4474      this.incCounter = incCounter;
4475    }
4476
4477    @Override
4478    public void run() {
4479      int count = 0;
4480      while (count < incCounter) {
4481        Increment inc = new Increment(incRow);
4482        inc.addColumn(family, qualifier, ONE);
4483        count++;
4484        try {
4485          region.increment(inc);
4486        } catch (IOException e) {
4487          LOG.info("Count=" + count + ", " + e);
4488          break;
4489        }
4490      }
4491    }
4492  }
4493
4494  /**
4495   * Test case to check increment function with memstore flushing
4496   * @throws Exception
4497   */
4498  @Test
4499  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4500    byte[] family = Incrementer.family;
4501    this.region = initHRegion(tableName, method, CONF, family);
4502    final HRegion region = this.region;
4503    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4504    Runnable flusher = new Runnable() {
4505      @Override
4506      public void run() {
4507        while (!incrementDone.get()) {
4508          try {
4509            region.flush(true);
4510          } catch (Exception e) {
4511            e.printStackTrace();
4512          }
4513        }
4514      }
4515    };
4516
4517    // after all increment finished, the row will increment to 20*100 = 2000
4518    int threadNum = 20;
4519    int incCounter = 100;
4520    long expected = (long) threadNum * incCounter;
4521    Thread[] incrementers = new Thread[threadNum];
4522    Thread flushThread = new Thread(flusher);
4523    for (int i = 0; i < threadNum; i++) {
4524      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4525      incrementers[i].start();
4526    }
4527    flushThread.start();
4528    for (int i = 0; i < threadNum; i++) {
4529      incrementers[i].join();
4530    }
4531
4532    incrementDone.set(true);
4533    flushThread.join();
4534
4535    Get get = new Get(Incrementer.incRow);
4536    get.addColumn(Incrementer.family, Incrementer.qualifier);
4537    get.setMaxVersions(1);
4538    Result res = this.region.get(get);
4539    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4540
4541    // we just got the latest version
4542    assertEquals(1, kvs.size());
4543    Cell kv = kvs.get(0);
4544    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4545  }
4546
4547  /**
4548   * TestCase for append
4549   */
4550  private static class Appender implements Runnable {
4551    private HRegion region;
4552    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4553    private final static byte[] family = Bytes.toBytes("family");
4554    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4555    private final static byte[] CHAR = Bytes.toBytes("a");
4556    private int appendCounter;
4557
4558    public Appender(HRegion region, int appendCounter) {
4559      this.region = region;
4560      this.appendCounter = appendCounter;
4561    }
4562
4563    @Override
4564    public void run() {
4565      int count = 0;
4566      while (count < appendCounter) {
4567        Append app = new Append(appendRow);
4568        app.addColumn(family, qualifier, CHAR);
4569        count++;
4570        try {
4571          region.append(app);
4572        } catch (IOException e) {
4573          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4574          break;
4575        }
4576      }
4577    }
4578  }
4579
4580  /**
4581   * Test case to check append function with memstore flushing
4582   * @throws Exception
4583   */
4584  @Test
4585  public void testParallelAppendWithMemStoreFlush() throws Exception {
4586    byte[] family = Appender.family;
4587    this.region = initHRegion(tableName, method, CONF, family);
4588    final HRegion region = this.region;
4589    final AtomicBoolean appendDone = new AtomicBoolean(false);
4590    Runnable flusher = new Runnable() {
4591      @Override
4592      public void run() {
4593        while (!appendDone.get()) {
4594          try {
4595            region.flush(true);
4596          } catch (Exception e) {
4597            e.printStackTrace();
4598          }
4599        }
4600      }
4601    };
4602
4603    // After all append finished, the value will append to threadNum *
4604    // appendCounter Appender.CHAR
4605    int threadNum = 20;
4606    int appendCounter = 100;
4607    byte[] expected = new byte[threadNum * appendCounter];
4608    for (int i = 0; i < threadNum * appendCounter; i++) {
4609      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4610    }
4611    Thread[] appenders = new Thread[threadNum];
4612    Thread flushThread = new Thread(flusher);
4613    for (int i = 0; i < threadNum; i++) {
4614      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4615      appenders[i].start();
4616    }
4617    flushThread.start();
4618    for (int i = 0; i < threadNum; i++) {
4619      appenders[i].join();
4620    }
4621
4622    appendDone.set(true);
4623    flushThread.join();
4624
4625    Get get = new Get(Appender.appendRow);
4626    get.addColumn(Appender.family, Appender.qualifier);
4627    get.setMaxVersions(1);
4628    Result res = this.region.get(get);
4629    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4630
4631    // we just got the latest version
4632    assertEquals(1, kvs.size());
4633    Cell kv = kvs.get(0);
4634    byte[] appendResult = new byte[kv.getValueLength()];
4635    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4636    assertArrayEquals(expected, appendResult);
4637  }
4638
4639  /**
4640   * Test case to check put function with memstore flushing for same row, same ts
4641   * @throws Exception
4642   */
4643  @Test
4644  public void testPutWithMemStoreFlush() throws Exception {
4645    byte[] family = Bytes.toBytes("family");
4646    byte[] qualifier = Bytes.toBytes("qualifier");
4647    byte[] row = Bytes.toBytes("putRow");
4648    byte[] value = null;
4649    this.region = initHRegion(tableName, method, CONF, family);
4650    Put put = null;
4651    Get get = null;
4652    List<Cell> kvs = null;
4653    Result res = null;
4654
4655    put = new Put(row);
4656    value = Bytes.toBytes("value0");
4657    put.addColumn(family, qualifier, 1234567L, value);
4658    region.put(put);
4659    get = new Get(row);
4660    get.addColumn(family, qualifier);
4661    get.setMaxVersions();
4662    res = this.region.get(get);
4663    kvs = res.getColumnCells(family, qualifier);
4664    assertEquals(1, kvs.size());
4665    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4666
4667    region.flush(true);
4668    get = new Get(row);
4669    get.addColumn(family, qualifier);
4670    get.setMaxVersions();
4671    res = this.region.get(get);
4672    kvs = res.getColumnCells(family, qualifier);
4673    assertEquals(1, kvs.size());
4674    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4675
4676    put = new Put(row);
4677    value = Bytes.toBytes("value1");
4678    put.addColumn(family, qualifier, 1234567L, value);
4679    region.put(put);
4680    get = new Get(row);
4681    get.addColumn(family, qualifier);
4682    get.setMaxVersions();
4683    res = this.region.get(get);
4684    kvs = res.getColumnCells(family, qualifier);
4685    assertEquals(1, kvs.size());
4686    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4687
4688    region.flush(true);
4689    get = new Get(row);
4690    get.addColumn(family, qualifier);
4691    get.setMaxVersions();
4692    res = this.region.get(get);
4693    kvs = res.getColumnCells(family, qualifier);
4694    assertEquals(1, kvs.size());
4695    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4696  }
4697
4698  @Test
4699  public void testDurability() throws Exception {
4700    // there are 5 x 5 cases:
4701    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4702    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4703
4704    // expected cases for append and sync wal
4705    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4706    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4707    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4708
4709    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4710    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4711    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4712
4713    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4714    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4715
4716    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4717    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4718
4719    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4720    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4721    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4722
4723    // expected cases for async wal
4724    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4725    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4726    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4727    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4728    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4729    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4730
4731    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4732    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4733    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4734    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4735    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4736    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4737
4738    // expect skip wal cases
4739    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4740    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4741    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4742    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4743    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4744    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4745
4746  }
4747
4748  private void durabilityTest(String method, Durability tableDurability,
4749      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4750      final boolean expectSyncFromLogSyncer) throws Exception {
4751    Configuration conf = HBaseConfiguration.create(CONF);
4752    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4753    byte[] family = Bytes.toBytes("family");
4754    Path logDir = new Path(new Path(dir + method), "log");
4755    final Configuration walConf = new Configuration(conf);
4756    CommonFSUtils.setRootDir(walConf, logDir);
4757    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4758    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4759    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4760    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4761    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4762    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4763        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4764        new byte[][] { family });
4765
4766    Put put = new Put(Bytes.toBytes("r1"));
4767    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4768    put.setDurability(mutationDurability);
4769    region.put(put);
4770
4771    // verify append called or not
4772    verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(),
4773      (WALKeyImpl) any(), (WALEdit) any());
4774
4775    // verify sync called or not
4776    if (expectSync || expectSyncFromLogSyncer) {
4777      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4778        @Override
4779        public boolean evaluate() throws Exception {
4780          try {
4781            if (expectSync) {
4782              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4783            } else if (expectSyncFromLogSyncer) {
4784              verify(wal, times(1)).sync(); // wal syncer calls this one
4785            }
4786          } catch (Throwable ignore) {
4787          }
4788          return true;
4789        }
4790      });
4791    } else {
4792      //verify(wal, never()).sync(anyLong());
4793      verify(wal, never()).sync();
4794    }
4795
4796    HBaseTestingUtility.closeRegionAndWAL(this.region);
4797    wals.close();
4798    this.region = null;
4799  }
4800
4801  @Test
4802  public void testRegionReplicaSecondary() throws IOException {
4803    // create a primary region, load some data and flush
4804    // create a secondary region, and do a get against that
4805    Path rootDir = new Path(dir + name.getMethodName());
4806    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4807
4808    byte[][] families = new byte[][] {
4809        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4810    };
4811    byte[] cq = Bytes.toBytes("cq");
4812    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4813    for (byte[] family : families) {
4814      htd.addFamily(new HColumnDescriptor(family));
4815    }
4816
4817    long time = System.currentTimeMillis();
4818    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4819      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4820      false, time, 0);
4821    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4822      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4823      false, time, 1);
4824
4825    HRegion primaryRegion = null, secondaryRegion = null;
4826
4827    try {
4828      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4829          rootDir, TEST_UTIL.getConfiguration(), htd);
4830
4831      // load some data
4832      putData(primaryRegion, 0, 1000, cq, families);
4833
4834      // flush region
4835      primaryRegion.flush(true);
4836
4837      // open secondary region
4838      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4839
4840      verifyData(secondaryRegion, 0, 1000, cq, families);
4841    } finally {
4842      if (primaryRegion != null) {
4843        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4844      }
4845      if (secondaryRegion != null) {
4846        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4847      }
4848    }
4849  }
4850
4851  @Test
4852  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4853    // create a primary region, load some data and flush
4854    // create a secondary region, and do a put against that
4855    Path rootDir = new Path(dir + name.getMethodName());
4856    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4857
4858    byte[][] families = new byte[][] {
4859        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4860    };
4861    byte[] cq = Bytes.toBytes("cq");
4862    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4863    for (byte[] family : families) {
4864      htd.addFamily(new HColumnDescriptor(family));
4865    }
4866
4867    long time = System.currentTimeMillis();
4868    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4869      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4870      false, time, 0);
4871    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4872      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4873      false, time, 1);
4874
4875    HRegion primaryRegion = null, secondaryRegion = null;
4876
4877    try {
4878      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4879          rootDir, TEST_UTIL.getConfiguration(), htd);
4880
4881      // load some data
4882      putData(primaryRegion, 0, 1000, cq, families);
4883
4884      // flush region
4885      primaryRegion.flush(true);
4886
4887      // open secondary region
4888      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4889
4890      try {
4891        putData(secondaryRegion, 0, 1000, cq, families);
4892        fail("Should have thrown exception");
4893      } catch (IOException ex) {
4894        // expected
4895      }
4896    } finally {
4897      if (primaryRegion != null) {
4898        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4899      }
4900      if (secondaryRegion != null) {
4901        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4902      }
4903    }
4904  }
4905
4906  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4907    Configuration confForWAL = new Configuration(conf);
4908    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4909    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4910  }
4911
4912  @Test
4913  public void testCompactionFromPrimary() throws IOException {
4914    Path rootDir = new Path(dir + name.getMethodName());
4915    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4916
4917    byte[][] families = new byte[][] {
4918        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4919    };
4920    byte[] cq = Bytes.toBytes("cq");
4921    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4922    for (byte[] family : families) {
4923      htd.addFamily(new HColumnDescriptor(family));
4924    }
4925
4926    long time = System.currentTimeMillis();
4927    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4928      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4929      false, time, 0);
4930    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4931      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4932      false, time, 1);
4933
4934    HRegion primaryRegion = null, secondaryRegion = null;
4935
4936    try {
4937      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4938          rootDir, TEST_UTIL.getConfiguration(), htd);
4939
4940      // load some data
4941      putData(primaryRegion, 0, 1000, cq, families);
4942
4943      // flush region
4944      primaryRegion.flush(true);
4945
4946      // open secondary region
4947      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4948
4949      // move the file of the primary region to the archive, simulating a compaction
4950      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4951      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4952      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4953          .getStoreFiles(families[0]);
4954      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4955
4956      verifyData(secondaryRegion, 0, 1000, cq, families);
4957    } finally {
4958      if (primaryRegion != null) {
4959        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4960      }
4961      if (secondaryRegion != null) {
4962        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4963      }
4964    }
4965  }
4966
4967  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4968      IOException {
4969    putData(this.region, startRow, numRows, qf, families);
4970  }
4971
4972  private void putData(HRegion region,
4973      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4974    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4975  }
4976
4977  static void putData(HRegion region, Durability durability,
4978      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4979    for (int i = startRow; i < startRow + numRows; i++) {
4980      Put put = new Put(Bytes.toBytes("" + i));
4981      put.setDurability(durability);
4982      for (byte[] family : families) {
4983        put.addColumn(family, qf, null);
4984      }
4985      region.put(put);
4986      LOG.info(put.toString());
4987    }
4988  }
4989
4990  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4991      throws IOException {
4992    for (int i = startRow; i < startRow + numRows; i++) {
4993      byte[] row = Bytes.toBytes("" + i);
4994      Get get = new Get(row);
4995      for (byte[] family : families) {
4996        get.addColumn(family, qf);
4997      }
4998      Result result = newReg.get(get);
4999      Cell[] raw = result.rawCells();
5000      assertEquals(families.length, result.size());
5001      for (int j = 0; j < families.length; j++) {
5002        assertTrue(CellUtil.matchingRows(raw[j], row));
5003        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
5004        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
5005      }
5006    }
5007  }
5008
5009  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
5010    // Now I have k, get values out and assert they are as expected.
5011    Get get = new Get(k).addFamily(family).setMaxVersions();
5012    Cell[] results = r.get(get).rawCells();
5013    for (int j = 0; j < results.length; j++) {
5014      byte[] tmp = CellUtil.cloneValue(results[j]);
5015      // Row should be equal to value every time.
5016      assertTrue(Bytes.equals(k, tmp));
5017    }
5018  }
5019
5020  /*
5021   * Assert first value in the passed region is <code>firstValue</code>.
5022   *
5023   * @param r
5024   *
5025   * @param fs
5026   *
5027   * @param firstValue
5028   *
5029   * @throws IOException
5030   */
5031  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5032      throws IOException {
5033    byte[][] families = { fs };
5034    Scan scan = new Scan();
5035    for (int i = 0; i < families.length; i++)
5036      scan.addFamily(families[i]);
5037    InternalScanner s = r.getScanner(scan);
5038    try {
5039      List<Cell> curVals = new ArrayList<>();
5040      boolean first = true;
5041      OUTER_LOOP: while (s.next(curVals)) {
5042        for (Cell kv : curVals) {
5043          byte[] val = CellUtil.cloneValue(kv);
5044          byte[] curval = val;
5045          if (first) {
5046            first = false;
5047            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5048          } else {
5049            // Not asserting anything. Might as well break.
5050            break OUTER_LOOP;
5051          }
5052        }
5053      }
5054    } finally {
5055      s.close();
5056    }
5057  }
5058
5059  /**
5060   * Test that we get the expected flush results back
5061   */
5062  @Test
5063  public void testFlushResult() throws IOException {
5064    byte[] family = Bytes.toBytes("family");
5065
5066    this.region = initHRegion(tableName, method, family);
5067
5068    // empty memstore, flush doesn't run
5069    HRegion.FlushResult fr = region.flush(true);
5070    assertFalse(fr.isFlushSucceeded());
5071    assertFalse(fr.isCompactionNeeded());
5072
5073    // Flush enough files to get up to the threshold, doesn't need compactions
5074    for (int i = 0; i < 2; i++) {
5075      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5076      region.put(put);
5077      fr = region.flush(true);
5078      assertTrue(fr.isFlushSucceeded());
5079      assertFalse(fr.isCompactionNeeded());
5080    }
5081
5082    // Two flushes after the threshold, compactions are needed
5083    for (int i = 0; i < 2; i++) {
5084      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5085      region.put(put);
5086      fr = region.flush(true);
5087      assertTrue(fr.isFlushSucceeded());
5088      assertTrue(fr.isCompactionNeeded());
5089    }
5090  }
5091
5092  protected Configuration initSplit() {
5093    // Always compact if there is more than one store file.
5094    CONF.setInt("hbase.hstore.compactionThreshold", 2);
5095
5096    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5097
5098    // Increase the amount of time between client retries
5099    CONF.setLong("hbase.client.pause", 15 * 1000);
5100
5101    // This size should make it so we always split using the addContent
5102    // below. After adding all data, the first region is 1.3M
5103    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5104    return CONF;
5105  }
5106
5107  /**
5108   * @return A region on which you must call
5109   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5110   */
5111  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5112      byte[]... families) throws IOException {
5113    return initHRegion(tableName, callingMethod, conf, false, families);
5114  }
5115
5116  /**
5117   * @return A region on which you must call
5118   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5119   */
5120  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5121      boolean isReadOnly, byte[]... families) throws IOException {
5122    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5123  }
5124
5125  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5126      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5127      throws IOException {
5128    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
5129    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
5130    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
5131    return initHRegion(tableName, startKey, stopKey, isReadOnly,
5132        Durability.SYNC_WAL, wal, families);
5133  }
5134
5135  /**
5136   * @return A region on which you must call
5137   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5138   */
5139  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5140      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
5141    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
5142    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
5143        isReadOnly, durability, wal, families);
5144  }
5145
5146  /**
5147   * Assert that the passed in Cell has expected contents for the specified row,
5148   * column & timestamp.
5149   */
5150  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5151    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5152    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5153        Bytes.toString(CellUtil.cloneRow(kv)));
5154    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5155        Bytes.toString(CellUtil.cloneFamily(kv)));
5156    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5157        Bytes.toString(CellUtil.cloneQualifier(kv)));
5158    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5159    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5160        Bytes.toString(CellUtil.cloneValue(kv)));
5161  }
5162
5163  @Test
5164  public void testReverseScanner_FromMemStore_SingleCF_Normal()
5165      throws IOException {
5166    byte[] rowC = Bytes.toBytes("rowC");
5167    byte[] rowA = Bytes.toBytes("rowA");
5168    byte[] rowB = Bytes.toBytes("rowB");
5169    byte[] cf = Bytes.toBytes("CF");
5170    byte[][] families = { cf };
5171    byte[] col = Bytes.toBytes("C");
5172    long ts = 1;
5173    this.region = initHRegion(tableName, method, families);
5174    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5175    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5176        null);
5177    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5178    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5179    Put put = null;
5180    put = new Put(rowC);
5181    put.add(kv1);
5182    put.add(kv11);
5183    region.put(put);
5184    put = new Put(rowA);
5185    put.add(kv2);
5186    region.put(put);
5187    put = new Put(rowB);
5188    put.add(kv3);
5189    region.put(put);
5190
5191    Scan scan = new Scan(rowC);
5192    scan.setMaxVersions(5);
5193    scan.setReversed(true);
5194    InternalScanner scanner = region.getScanner(scan);
5195    List<Cell> currRow = new ArrayList<>();
5196    boolean hasNext = scanner.next(currRow);
5197    assertEquals(2, currRow.size());
5198    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5199        .get(0).getRowLength(), rowC, 0, rowC.length));
5200    assertTrue(hasNext);
5201    currRow.clear();
5202    hasNext = scanner.next(currRow);
5203    assertEquals(1, currRow.size());
5204    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5205        .get(0).getRowLength(), rowB, 0, rowB.length));
5206    assertTrue(hasNext);
5207    currRow.clear();
5208    hasNext = scanner.next(currRow);
5209    assertEquals(1, currRow.size());
5210    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5211        .get(0).getRowLength(), rowA, 0, rowA.length));
5212    assertFalse(hasNext);
5213    scanner.close();
5214  }
5215
5216  @Test
5217  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5218      throws IOException {
5219    byte[] rowC = Bytes.toBytes("rowC");
5220    byte[] rowA = Bytes.toBytes("rowA");
5221    byte[] rowB = Bytes.toBytes("rowB");
5222    byte[] rowD = Bytes.toBytes("rowD");
5223    byte[] cf = Bytes.toBytes("CF");
5224    byte[][] families = { cf };
5225    byte[] col = Bytes.toBytes("C");
5226    long ts = 1;
5227    this.region = initHRegion(tableName, method, families);
5228    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5229    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5230        null);
5231    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5232    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5233    Put put = null;
5234    put = new Put(rowC);
5235    put.add(kv1);
5236    put.add(kv11);
5237    region.put(put);
5238    put = new Put(rowA);
5239    put.add(kv2);
5240    region.put(put);
5241    put = new Put(rowB);
5242    put.add(kv3);
5243    region.put(put);
5244
5245    Scan scan = new Scan(rowD);
5246    List<Cell> currRow = new ArrayList<>();
5247    scan.setReversed(true);
5248    scan.setMaxVersions(5);
5249    InternalScanner scanner = region.getScanner(scan);
5250    boolean hasNext = scanner.next(currRow);
5251    assertEquals(2, currRow.size());
5252    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5253        .get(0).getRowLength(), rowC, 0, rowC.length));
5254    assertTrue(hasNext);
5255    currRow.clear();
5256    hasNext = scanner.next(currRow);
5257    assertEquals(1, currRow.size());
5258    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5259        .get(0).getRowLength(), rowB, 0, rowB.length));
5260    assertTrue(hasNext);
5261    currRow.clear();
5262    hasNext = scanner.next(currRow);
5263    assertEquals(1, currRow.size());
5264    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5265        .get(0).getRowLength(), rowA, 0, rowA.length));
5266    assertFalse(hasNext);
5267    scanner.close();
5268  }
5269
5270  @Test
5271  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5272      throws IOException {
5273    byte[] rowC = Bytes.toBytes("rowC");
5274    byte[] rowA = Bytes.toBytes("rowA");
5275    byte[] rowB = Bytes.toBytes("rowB");
5276    byte[] cf = Bytes.toBytes("CF");
5277    byte[][] families = { cf };
5278    byte[] col = Bytes.toBytes("C");
5279    long ts = 1;
5280    this.region = initHRegion(tableName, method, families);
5281    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5282    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5283        null);
5284    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5285    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5286    Put put = null;
5287    put = new Put(rowC);
5288    put.add(kv1);
5289    put.add(kv11);
5290    region.put(put);
5291    put = new Put(rowA);
5292    put.add(kv2);
5293    region.put(put);
5294    put = new Put(rowB);
5295    put.add(kv3);
5296    region.put(put);
5297    Scan scan = new Scan();
5298    List<Cell> currRow = new ArrayList<>();
5299    scan.setReversed(true);
5300    InternalScanner scanner = region.getScanner(scan);
5301    boolean hasNext = scanner.next(currRow);
5302    assertEquals(1, currRow.size());
5303    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5304        .get(0).getRowLength(), rowC, 0, rowC.length));
5305    assertTrue(hasNext);
5306    currRow.clear();
5307    hasNext = scanner.next(currRow);
5308    assertEquals(1, currRow.size());
5309    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5310        .get(0).getRowLength(), rowB, 0, rowB.length));
5311    assertTrue(hasNext);
5312    currRow.clear();
5313    hasNext = scanner.next(currRow);
5314    assertEquals(1, currRow.size());
5315    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5316        .get(0).getRowLength(), rowA, 0, rowA.length));
5317    assertFalse(hasNext);
5318    scanner.close();
5319  }
5320
5321  @Test
5322  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5323    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5324    byte[] rowA = Bytes.toBytes("rowA");
5325    byte[] rowB = Bytes.toBytes("rowB");
5326    byte[] rowC = Bytes.toBytes("rowC");
5327    byte[] rowD = Bytes.toBytes("rowD");
5328    byte[] rowE = Bytes.toBytes("rowE");
5329    byte[] cf = Bytes.toBytes("CF");
5330    byte[][] families = { cf };
5331    byte[] col1 = Bytes.toBytes("col1");
5332    byte[] col2 = Bytes.toBytes("col2");
5333    long ts = 1;
5334    this.region = initHRegion(tableName, method, families);
5335    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5336    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5337    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5338    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5339    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5340    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5341    Put put = null;
5342    put = new Put(rowA);
5343    put.add(kv1);
5344    region.put(put);
5345    put = new Put(rowB);
5346    put.add(kv2);
5347    region.put(put);
5348    put = new Put(rowC);
5349    put.add(kv3);
5350    region.put(put);
5351    put = new Put(rowD);
5352    put.add(kv4_1);
5353    region.put(put);
5354    put = new Put(rowD);
5355    put.add(kv4_2);
5356    region.put(put);
5357    put = new Put(rowE);
5358    put.add(kv5);
5359    region.put(put);
5360    region.flush(true);
5361    Scan scan = new Scan(rowD, rowA);
5362    scan.addColumn(families[0], col1);
5363    scan.setReversed(true);
5364    List<Cell> currRow = new ArrayList<>();
5365    InternalScanner scanner = region.getScanner(scan);
5366    boolean hasNext = scanner.next(currRow);
5367    assertEquals(1, currRow.size());
5368    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5369        .get(0).getRowLength(), rowD, 0, rowD.length));
5370    assertTrue(hasNext);
5371    currRow.clear();
5372    hasNext = scanner.next(currRow);
5373    assertEquals(1, currRow.size());
5374    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5375        .get(0).getRowLength(), rowC, 0, rowC.length));
5376    assertTrue(hasNext);
5377    currRow.clear();
5378    hasNext = scanner.next(currRow);
5379    assertEquals(1, currRow.size());
5380    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5381        .get(0).getRowLength(), rowB, 0, rowB.length));
5382    assertFalse(hasNext);
5383    scanner.close();
5384
5385    scan = new Scan(rowD, rowA);
5386    scan.addColumn(families[0], col2);
5387    scan.setReversed(true);
5388    currRow.clear();
5389    scanner = region.getScanner(scan);
5390    hasNext = scanner.next(currRow);
5391    assertEquals(1, currRow.size());
5392    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5393        .get(0).getRowLength(), rowD, 0, rowD.length));
5394    scanner.close();
5395  }
5396
5397  @Test
5398  public void testReverseScanner_smaller_blocksize() throws IOException {
5399    // case to ensure no conflict with HFile index optimization
5400    byte[] rowA = Bytes.toBytes("rowA");
5401    byte[] rowB = Bytes.toBytes("rowB");
5402    byte[] rowC = Bytes.toBytes("rowC");
5403    byte[] rowD = Bytes.toBytes("rowD");
5404    byte[] rowE = Bytes.toBytes("rowE");
5405    byte[] cf = Bytes.toBytes("CF");
5406    byte[][] families = { cf };
5407    byte[] col1 = Bytes.toBytes("col1");
5408    byte[] col2 = Bytes.toBytes("col2");
5409    long ts = 1;
5410    HBaseConfiguration config = new HBaseConfiguration();
5411    config.setInt("test.block.size", 1);
5412    this.region = initHRegion(tableName, method, config, families);
5413    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5414    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5415    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5416    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5417    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5418    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5419    Put put = null;
5420    put = new Put(rowA);
5421    put.add(kv1);
5422    region.put(put);
5423    put = new Put(rowB);
5424    put.add(kv2);
5425    region.put(put);
5426    put = new Put(rowC);
5427    put.add(kv3);
5428    region.put(put);
5429    put = new Put(rowD);
5430    put.add(kv4_1);
5431    region.put(put);
5432    put = new Put(rowD);
5433    put.add(kv4_2);
5434    region.put(put);
5435    put = new Put(rowE);
5436    put.add(kv5);
5437    region.put(put);
5438    region.flush(true);
5439    Scan scan = new Scan(rowD, rowA);
5440    scan.addColumn(families[0], col1);
5441    scan.setReversed(true);
5442    List<Cell> currRow = new ArrayList<>();
5443    InternalScanner scanner = region.getScanner(scan);
5444    boolean hasNext = scanner.next(currRow);
5445    assertEquals(1, currRow.size());
5446    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5447        .get(0).getRowLength(), rowD, 0, rowD.length));
5448    assertTrue(hasNext);
5449    currRow.clear();
5450    hasNext = scanner.next(currRow);
5451    assertEquals(1, currRow.size());
5452    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5453        .get(0).getRowLength(), rowC, 0, rowC.length));
5454    assertTrue(hasNext);
5455    currRow.clear();
5456    hasNext = scanner.next(currRow);
5457    assertEquals(1, currRow.size());
5458    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5459        .get(0).getRowLength(), rowB, 0, rowB.length));
5460    assertFalse(hasNext);
5461    scanner.close();
5462
5463    scan = new Scan(rowD, rowA);
5464    scan.addColumn(families[0], col2);
5465    scan.setReversed(true);
5466    currRow.clear();
5467    scanner = region.getScanner(scan);
5468    hasNext = scanner.next(currRow);
5469    assertEquals(1, currRow.size());
5470    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5471        .get(0).getRowLength(), rowD, 0, rowD.length));
5472    scanner.close();
5473  }
5474
5475  @Test
5476  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5477      throws IOException {
5478    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5479    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5480    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5481    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5482    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5483    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5484    byte[] cf1 = Bytes.toBytes("CF1");
5485    byte[] cf2 = Bytes.toBytes("CF2");
5486    byte[] cf3 = Bytes.toBytes("CF3");
5487    byte[][] families = { cf1, cf2, cf3 };
5488    byte[] col = Bytes.toBytes("C");
5489    long ts = 1;
5490    HBaseConfiguration conf = new HBaseConfiguration();
5491    // disable compactions in this test.
5492    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5493    this.region = initHRegion(tableName, method, conf, families);
5494    // kv naming style: kv(row number) totalKvCountInThisRow seq no
5495    KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5496        null);
5497    KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5498        null);
5499    KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5500        KeyValue.Type.Put, null);
5501    KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5502        null);
5503    KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5504        null);
5505    KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5506        null);
5507    KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5508        KeyValue.Type.Put, null);
5509    KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5510        null);
5511    KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5512        KeyValue.Type.Put, null);
5513    KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5514        null);
5515    KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5516        null);
5517    KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5518        KeyValue.Type.Put, null);
5519    KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5520        null);
5521    KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5522        KeyValue.Type.Put, null);
5523    KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5524        null);
5525    KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5526        null);
5527    // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5528    Put put = null;
5529    put = new Put(row1);
5530    put.add(kv1_2_1);
5531    region.put(put);
5532    put = new Put(row2);
5533    put.add(kv2_4_1);
5534    region.put(put);
5535    put = new Put(row4);
5536    put.add(kv4_5_4);
5537    put.add(kv4_5_5);
5538    region.put(put);
5539    region.flush(true);
5540    // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5541    put = new Put(row4);
5542    put.add(kv4_5_1);
5543    put.add(kv4_5_3);
5544    region.put(put);
5545    put = new Put(row1);
5546    put.add(kv1_2_2);
5547    region.put(put);
5548    put = new Put(row2);
5549    put.add(kv2_4_4);
5550    region.put(put);
5551    region.flush(true);
5552    // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5553    put = new Put(row4);
5554    put.add(kv4_5_2);
5555    region.put(put);
5556    put = new Put(row2);
5557    put.add(kv2_4_2);
5558    put.add(kv2_4_3);
5559    region.put(put);
5560    put = new Put(row3);
5561    put.add(kv3_2_2);
5562    region.put(put);
5563    region.flush(true);
5564    // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5565    // ( 2 kv)
5566    put = new Put(row0);
5567    put.add(kv0_1_1);
5568    region.put(put);
5569    put = new Put(row3);
5570    put.add(kv3_2_1);
5571    region.put(put);
5572    put = new Put(row5);
5573    put.add(kv5_2_1);
5574    put.add(kv5_2_2);
5575    region.put(put);
5576    // scan range = ["row4", min), skip the max "row5"
5577    Scan scan = new Scan(row4);
5578    scan.setMaxVersions(5);
5579    scan.setBatch(3);
5580    scan.setReversed(true);
5581    InternalScanner scanner = region.getScanner(scan);
5582    List<Cell> currRow = new ArrayList<>();
5583    boolean hasNext = false;
5584    // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5585    // included in scan range
5586    // "row4" takes 2 next() calls since batch=3
5587    hasNext = scanner.next(currRow);
5588    assertEquals(3, currRow.size());
5589    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5590        .get(0).getRowLength(), row4, 0, row4.length));
5591    assertTrue(hasNext);
5592    currRow.clear();
5593    hasNext = scanner.next(currRow);
5594    assertEquals(2, currRow.size());
5595    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5596        currRow.get(0).getRowLength(), row4, 0,
5597      row4.length));
5598    assertTrue(hasNext);
5599    // 2. scan out "row3" (2 kv)
5600    currRow.clear();
5601    hasNext = scanner.next(currRow);
5602    assertEquals(2, currRow.size());
5603    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5604        .get(0).getRowLength(), row3, 0, row3.length));
5605    assertTrue(hasNext);
5606    // 3. scan out "row2" (4 kvs)
5607    // "row2" takes 2 next() calls since batch=3
5608    currRow.clear();
5609    hasNext = scanner.next(currRow);
5610    assertEquals(3, currRow.size());
5611    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5612        .get(0).getRowLength(), row2, 0, row2.length));
5613    assertTrue(hasNext);
5614    currRow.clear();
5615    hasNext = scanner.next(currRow);
5616    assertEquals(1, currRow.size());
5617    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5618        .get(0).getRowLength(), row2, 0, row2.length));
5619    assertTrue(hasNext);
5620    // 4. scan out "row1" (2 kv)
5621    currRow.clear();
5622    hasNext = scanner.next(currRow);
5623    assertEquals(2, currRow.size());
5624    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5625        .get(0).getRowLength(), row1, 0, row1.length));
5626    assertTrue(hasNext);
5627    // 5. scan out "row0" (1 kv)
5628    currRow.clear();
5629    hasNext = scanner.next(currRow);
5630    assertEquals(1, currRow.size());
5631    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5632        .get(0).getRowLength(), row0, 0, row0.length));
5633    assertFalse(hasNext);
5634
5635    scanner.close();
5636  }
5637
5638  @Test
5639  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5640      throws IOException {
5641    byte[] row1 = Bytes.toBytes("row1");
5642    byte[] row2 = Bytes.toBytes("row2");
5643    byte[] row3 = Bytes.toBytes("row3");
5644    byte[] row4 = Bytes.toBytes("row4");
5645    byte[] cf1 = Bytes.toBytes("CF1");
5646    byte[] cf2 = Bytes.toBytes("CF2");
5647    byte[] cf3 = Bytes.toBytes("CF3");
5648    byte[] cf4 = Bytes.toBytes("CF4");
5649    byte[][] families = { cf1, cf2, cf3, cf4 };
5650    byte[] col = Bytes.toBytes("C");
5651    long ts = 1;
5652    HBaseConfiguration conf = new HBaseConfiguration();
5653    // disable compactions in this test.
5654    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5655    this.region = initHRegion(tableName, method, conf, families);
5656    KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5657    KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5658    KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5659    KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5660    // storefile1
5661    Put put = new Put(row1);
5662    put.add(kv1);
5663    region.put(put);
5664    region.flush(true);
5665    // storefile2
5666    put = new Put(row2);
5667    put.add(kv2);
5668    region.put(put);
5669    region.flush(true);
5670    // storefile3
5671    put = new Put(row3);
5672    put.add(kv3);
5673    region.put(put);
5674    region.flush(true);
5675    // memstore
5676    put = new Put(row4);
5677    put.add(kv4);
5678    region.put(put);
5679    // scan range = ["row4", min)
5680    Scan scan = new Scan(row4);
5681    scan.setReversed(true);
5682    scan.setBatch(10);
5683    InternalScanner scanner = region.getScanner(scan);
5684    List<Cell> currRow = new ArrayList<>();
5685    boolean hasNext = scanner.next(currRow);
5686    assertEquals(1, currRow.size());
5687    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5688        .get(0).getRowLength(), row4, 0, row4.length));
5689    assertTrue(hasNext);
5690    currRow.clear();
5691    hasNext = scanner.next(currRow);
5692    assertEquals(1, currRow.size());
5693    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5694        .get(0).getRowLength(), row3, 0, row3.length));
5695    assertTrue(hasNext);
5696    currRow.clear();
5697    hasNext = scanner.next(currRow);
5698    assertEquals(1, currRow.size());
5699    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5700        .get(0).getRowLength(), row2, 0, row2.length));
5701    assertTrue(hasNext);
5702    currRow.clear();
5703    hasNext = scanner.next(currRow);
5704    assertEquals(1, currRow.size());
5705    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5706        .get(0).getRowLength(), row1, 0, row1.length));
5707    assertFalse(hasNext);
5708  }
5709
5710  /**
5711   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5712   */
5713  @Test
5714  public void testReverseScanner_StackOverflow() throws IOException {
5715    byte[] cf1 = Bytes.toBytes("CF1");
5716    byte[][] families = {cf1};
5717    byte[] col = Bytes.toBytes("C");
5718    HBaseConfiguration conf = new HBaseConfiguration();
5719    this.region = initHRegion(tableName, method, conf, families);
5720    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5721    Put put = new Put(Bytes.toBytes("19998"));
5722    put.addColumn(cf1, col, Bytes.toBytes("val"));
5723    region.put(put);
5724    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5725    Put put2 = new Put(Bytes.toBytes("19997"));
5726    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5727    region.put(put2);
5728
5729    Scan scan = new Scan(Bytes.toBytes("19998"));
5730    scan.setReversed(true);
5731    InternalScanner scanner = region.getScanner(scan);
5732
5733    // create one storefile contains many rows will be skipped
5734    // to check StoreFileScanner.seekToPreviousRow
5735    for (int i = 10000; i < 20000; i++) {
5736      Put p = new Put(Bytes.toBytes(""+i));
5737      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5738      region.put(p);
5739    }
5740    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5741
5742    // create one memstore contains many rows will be skipped
5743    // to check MemStoreScanner.seekToPreviousRow
5744    for (int i = 10000; i < 20000; i++) {
5745      Put p = new Put(Bytes.toBytes(""+i));
5746      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5747      region.put(p);
5748    }
5749
5750    List<Cell> currRow = new ArrayList<>();
5751    boolean hasNext;
5752    do {
5753      hasNext = scanner.next(currRow);
5754    } while (hasNext);
5755    assertEquals(2, currRow.size());
5756    assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5757      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5758    assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5759      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5760  }
5761
5762  @Test
5763  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5764    byte[] cf1 = Bytes.toBytes("CF1");
5765    byte[][] families = { cf1 };
5766    byte[] col = Bytes.toBytes("C");
5767    HBaseConfiguration conf = new HBaseConfiguration();
5768    this.region = initHRegion(tableName, method, conf, families);
5769    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5770    Put put = new Put(Bytes.toBytes("19996"));
5771    put.addColumn(cf1, col, Bytes.toBytes("val"));
5772    region.put(put);
5773    Put put2 = new Put(Bytes.toBytes("19995"));
5774    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5775    region.put(put2);
5776    // create a reverse scan
5777    Scan scan = new Scan(Bytes.toBytes("19996"));
5778    scan.setReversed(true);
5779    RegionScannerImpl scanner = region.getScanner(scan);
5780
5781    // flush the cache. This will reset the store scanner
5782    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5783
5784    // create one memstore contains many rows will be skipped
5785    // to check MemStoreScanner.seekToPreviousRow
5786    for (int i = 10000; i < 20000; i++) {
5787      Put p = new Put(Bytes.toBytes("" + i));
5788      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5789      region.put(p);
5790    }
5791    List<Cell> currRow = new ArrayList<>();
5792    boolean hasNext;
5793    boolean assertDone = false;
5794    do {
5795      hasNext = scanner.next(currRow);
5796      // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5797      // added here
5798      if (!assertDone) {
5799        StoreScanner current =
5800            (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5801        List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5802        assertEquals("There should be only one scanner the store file scanner", 1,
5803          scanners.size());
5804        assertDone = true;
5805      }
5806    } while (hasNext);
5807    assertEquals(2, currRow.size());
5808    assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5809      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5810    assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5811      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5812  }
5813
5814  @Test
5815  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5816    byte[] cf1 = Bytes.toBytes("CF1");
5817    byte[][] families = { cf1 };
5818    byte[] col = Bytes.toBytes("C");
5819
5820    HBaseConfiguration conf = new HBaseConfiguration();
5821    this.region = initHRegion(tableName, method, conf, families);
5822
5823    Put put = new Put(Bytes.toBytes("199996"));
5824    put.addColumn(cf1, col, Bytes.toBytes("val"));
5825    region.put(put);
5826    Put put2 = new Put(Bytes.toBytes("199995"));
5827    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5828    region.put(put2);
5829
5830    // Create a reverse scan
5831    Scan scan = new Scan(Bytes.toBytes("199996"));
5832    scan.setReversed(true);
5833    RegionScannerImpl scanner = region.getScanner(scan);
5834
5835    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5836    for (int i = 100000; i < 200000; i++) {
5837      Put p = new Put(Bytes.toBytes("" + i));
5838      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5839      region.put(p);
5840    }
5841    List<Cell> currRow = new ArrayList<>();
5842    boolean hasNext;
5843    do {
5844      hasNext = scanner.next(currRow);
5845    } while (hasNext);
5846
5847    assertEquals(2, currRow.size());
5848    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5849      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5850    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5851      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5852  }
5853
5854  @Test
5855  public void testWriteRequestsCounter() throws IOException {
5856    byte[] fam = Bytes.toBytes("info");
5857    byte[][] families = { fam };
5858    this.region = initHRegion(tableName, method, CONF, families);
5859
5860    Assert.assertEquals(0L, region.getWriteRequestsCount());
5861
5862    Put put = new Put(row);
5863    put.addColumn(fam, fam, fam);
5864
5865    Assert.assertEquals(0L, region.getWriteRequestsCount());
5866    region.put(put);
5867    Assert.assertEquals(1L, region.getWriteRequestsCount());
5868    region.put(put);
5869    Assert.assertEquals(2L, region.getWriteRequestsCount());
5870    region.put(put);
5871    Assert.assertEquals(3L, region.getWriteRequestsCount());
5872
5873    region.delete(new Delete(row));
5874    Assert.assertEquals(4L, region.getWriteRequestsCount());
5875  }
5876
5877  @Test
5878  public void testOpenRegionWrittenToWAL() throws Exception {
5879    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5880    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5881
5882    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5883      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5884      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5885    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5886
5887    // open the region w/o rss and wal and flush some files
5888    region =
5889         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5890             .getConfiguration(), htd);
5891    assertNotNull(region);
5892
5893    // create a file in fam1 for the region before opening in OpenRegionHandler
5894    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5895    region.flush(true);
5896    HBaseTestingUtility.closeRegionAndWAL(region);
5897
5898    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5899
5900    // capture append() calls
5901    WAL wal = mockWAL();
5902    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5903
5904    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5905      TEST_UTIL.getConfiguration(), rss, null);
5906
5907    verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
5908      editCaptor.capture());
5909
5910    WALEdit edit = editCaptor.getValue();
5911    assertNotNull(edit);
5912    assertNotNull(edit.getCells());
5913    assertEquals(1, edit.getCells().size());
5914    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5915    assertNotNull(desc);
5916
5917    LOG.info("RegionEventDescriptor from WAL: " + desc);
5918
5919    assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5920    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5921    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5922      hri.getEncodedNameAsBytes()));
5923    assertTrue(desc.getLogSequenceNumber() > 0);
5924    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5925    assertEquals(2, desc.getStoresCount());
5926
5927    StoreDescriptor store = desc.getStores(0);
5928    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5929    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5930    assertEquals(1, store.getStoreFileCount()); // 1store file
5931    assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5932
5933    store = desc.getStores(1);
5934    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5935    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5936    assertEquals(0, store.getStoreFileCount()); // no store files
5937  }
5938
5939  // Helper for test testOpenRegionWrittenToWALForLogReplay
5940  static class HRegionWithSeqId extends HRegion {
5941    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5942        final Configuration confParam, final RegionInfo regionInfo,
5943        final TableDescriptor htd, final RegionServerServices rsServices) {
5944      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5945    }
5946    @Override
5947    protected long getNextSequenceId(WAL wal) throws IOException {
5948      return 42;
5949    }
5950  }
5951
5952  @Test
5953  public void testFlushedFileWithNoTags() throws Exception {
5954    final TableName tableName = TableName.valueOf(name.getMethodName());
5955    HTableDescriptor htd = new HTableDescriptor(tableName);
5956    htd.addFamily(new HColumnDescriptor(fam1));
5957    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5958    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5959    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5960    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5961    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5962    region.put(put);
5963    region.flush(true);
5964    HStore store = region.getStore(fam1);
5965    Collection<HStoreFile> storefiles = store.getStorefiles();
5966    for (HStoreFile sf : storefiles) {
5967      assertFalse("Tags should not be present "
5968          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5969    }
5970  }
5971
5972  /**
5973   * Utility method to setup a WAL mock.
5974   * <p/>
5975   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
5976   * @return a mock WAL
5977   */
5978  private WAL mockWAL() throws IOException {
5979    WAL wal = mock(WAL.class);
5980    when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
5981      .thenAnswer(new Answer<Long>() {
5982        @Override
5983        public Long answer(InvocationOnMock invocation) throws Throwable {
5984          WALKeyImpl key = invocation.getArgument(1);
5985          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5986          key.setWriteEntry(we);
5987          return 1L;
5988        }
5989      });
5990    when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))).
5991        thenAnswer(new Answer<Long>() {
5992          @Override
5993          public Long answer(InvocationOnMock invocation) throws Throwable {
5994            WALKeyImpl key = invocation.getArgument(1);
5995            MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5996            key.setWriteEntry(we);
5997            return 1L;
5998          }
5999        });
6000    return wal;
6001  }
6002
6003  @Test
6004  public void testCloseRegionWrittenToWAL() throws Exception {
6005    Path rootDir = new Path(dir + name.getMethodName());
6006    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
6007
6008    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6009    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6010
6011    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
6012      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
6013      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
6014    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
6015
6016    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6017
6018    // capture append() calls
6019    WAL wal = mockWAL();
6020    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
6021
6022
6023    // create and then open a region first so that it can be closed later
6024    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
6025    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6026      TEST_UTIL.getConfiguration(), rss, null);
6027
6028    // close the region
6029    region.close(false);
6030
6031    // 2 times, one for region open, the other close region
6032    verify(wal, times(2)).appendMarker(any(RegionInfo.class),
6033        (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture());
6034
6035    WALEdit edit = editCaptor.getAllValues().get(1);
6036    assertNotNull(edit);
6037    assertNotNull(edit.getCells());
6038    assertEquals(1, edit.getCells().size());
6039    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6040    assertNotNull(desc);
6041
6042    LOG.info("RegionEventDescriptor from WAL: " + desc);
6043
6044    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6045    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
6046    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6047      hri.getEncodedNameAsBytes()));
6048    assertTrue(desc.getLogSequenceNumber() > 0);
6049    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6050    assertEquals(2, desc.getStoresCount());
6051
6052    StoreDescriptor store = desc.getStores(0);
6053    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6054    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6055    assertEquals(0, store.getStoreFileCount()); // no store files
6056
6057    store = desc.getStores(1);
6058    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6059    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6060    assertEquals(0, store.getStoreFileCount()); // no store files
6061  }
6062
6063  /**
6064   * Test RegionTooBusyException thrown when region is busy
6065   */
6066  @Test
6067  public void testRegionTooBusy() throws IOException {
6068    byte[] family = Bytes.toBytes("family");
6069    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6070      HRegion.DEFAULT_BUSY_WAIT_DURATION);
6071    CONF.setLong("hbase.busy.wait.duration", 1000);
6072    region = initHRegion(tableName, method, CONF, family);
6073    final AtomicBoolean stopped = new AtomicBoolean(true);
6074    Thread t = new Thread(new Runnable() {
6075      @Override
6076      public void run() {
6077        try {
6078          region.lock.writeLock().lock();
6079          stopped.set(false);
6080          while (!stopped.get()) {
6081            Thread.sleep(100);
6082          }
6083        } catch (InterruptedException ie) {
6084        } finally {
6085          region.lock.writeLock().unlock();
6086        }
6087      }
6088    });
6089    t.start();
6090    Get get = new Get(row);
6091    try {
6092      while (stopped.get()) {
6093        Thread.sleep(100);
6094      }
6095      region.get(get);
6096      fail("Should throw RegionTooBusyException");
6097    } catch (InterruptedException ie) {
6098      fail("test interrupted");
6099    } catch (RegionTooBusyException e) {
6100      // Good, expected
6101    } finally {
6102      stopped.set(true);
6103      try {
6104        t.join();
6105      } catch (Throwable e) {
6106      }
6107
6108      HBaseTestingUtility.closeRegionAndWAL(region);
6109      region = null;
6110      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6111    }
6112  }
6113
6114  @Test
6115  public void testCellTTLs() throws IOException {
6116    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6117    EnvironmentEdgeManager.injectEdge(edge);
6118
6119    final byte[] row = Bytes.toBytes("testRow");
6120    final byte[] q1 = Bytes.toBytes("q1");
6121    final byte[] q2 = Bytes.toBytes("q2");
6122    final byte[] q3 = Bytes.toBytes("q3");
6123    final byte[] q4 = Bytes.toBytes("q4");
6124
6125    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6126    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
6127    hcd.setTimeToLive(10); // 10 seconds
6128    htd.addFamily(hcd);
6129
6130    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6131    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6132
6133    region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
6134            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
6135        TEST_UTIL.getDataTestDir(), conf, htd);
6136    assertNotNull(region);
6137    long now = EnvironmentEdgeManager.currentTime();
6138    // Add a cell that will expire in 5 seconds via cell TTL
6139    region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6140      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6141        // TTL tags specify ts in milliseconds
6142        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
6143    // Add a cell that will expire after 10 seconds via family setting
6144    region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6145    // Add a cell that will expire in 15 seconds via cell TTL
6146    region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6147      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6148        // TTL tags specify ts in milliseconds
6149        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
6150    // Add a cell that will expire in 20 seconds via family setting
6151    region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6152
6153    // Flush so we are sure store scanning gets this right
6154    region.flush(true);
6155
6156    // A query at time T+0 should return all cells
6157    Result r = region.get(new Get(row));
6158    assertNotNull(r.getValue(fam1, q1));
6159    assertNotNull(r.getValue(fam1, q2));
6160    assertNotNull(r.getValue(fam1, q3));
6161    assertNotNull(r.getValue(fam1, q4));
6162
6163    // Increment time to T+5 seconds
6164    edge.incrementTime(5000);
6165
6166    r = region.get(new Get(row));
6167    assertNull(r.getValue(fam1, q1));
6168    assertNotNull(r.getValue(fam1, q2));
6169    assertNotNull(r.getValue(fam1, q3));
6170    assertNotNull(r.getValue(fam1, q4));
6171
6172    // Increment time to T+10 seconds
6173    edge.incrementTime(5000);
6174
6175    r = region.get(new Get(row));
6176    assertNull(r.getValue(fam1, q1));
6177    assertNull(r.getValue(fam1, q2));
6178    assertNotNull(r.getValue(fam1, q3));
6179    assertNotNull(r.getValue(fam1, q4));
6180
6181    // Increment time to T+15 seconds
6182    edge.incrementTime(5000);
6183
6184    r = region.get(new Get(row));
6185    assertNull(r.getValue(fam1, q1));
6186    assertNull(r.getValue(fam1, q2));
6187    assertNull(r.getValue(fam1, q3));
6188    assertNotNull(r.getValue(fam1, q4));
6189
6190    // Increment time to T+20 seconds
6191    edge.incrementTime(10000);
6192
6193    r = region.get(new Get(row));
6194    assertNull(r.getValue(fam1, q1));
6195    assertNull(r.getValue(fam1, q2));
6196    assertNull(r.getValue(fam1, q3));
6197    assertNull(r.getValue(fam1, q4));
6198
6199    // Fun with disappearing increments
6200
6201    // Start at 1
6202    region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
6203    r = region.get(new Get(row));
6204    byte[] val = r.getValue(fam1, q1);
6205    assertNotNull(val);
6206    assertEquals(1L, Bytes.toLong(val));
6207
6208    // Increment with a TTL of 5 seconds
6209    Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6210    incr.setTTL(5000);
6211    region.increment(incr); // 2
6212
6213    // New value should be 2
6214    r = region.get(new Get(row));
6215    val = r.getValue(fam1, q1);
6216    assertNotNull(val);
6217    assertEquals(2L, Bytes.toLong(val));
6218
6219    // Increment time to T+25 seconds
6220    edge.incrementTime(5000);
6221
6222    // Value should be back to 1
6223    r = region.get(new Get(row));
6224    val = r.getValue(fam1, q1);
6225    assertNotNull(val);
6226    assertEquals(1L, Bytes.toLong(val));
6227
6228    // Increment time to T+30 seconds
6229    edge.incrementTime(5000);
6230
6231    // Original value written at T+20 should be gone now via family TTL
6232    r = region.get(new Get(row));
6233    assertNull(r.getValue(fam1, q1));
6234  }
6235
6236  @Test
6237  public void testIncrementTimestampsAreMonotonic() throws IOException {
6238    region = initHRegion(tableName, method, CONF, fam1);
6239    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6240    EnvironmentEdgeManager.injectEdge(edge);
6241
6242    edge.setValue(10);
6243    Increment inc = new Increment(row);
6244    inc.setDurability(Durability.SKIP_WAL);
6245    inc.addColumn(fam1, qual1, 1L);
6246    region.increment(inc);
6247
6248    Result result = region.get(new Get(row));
6249    Cell c = result.getColumnLatestCell(fam1, qual1);
6250    assertNotNull(c);
6251    assertEquals(10L, c.getTimestamp());
6252
6253    edge.setValue(1); // clock goes back
6254    region.increment(inc);
6255    result = region.get(new Get(row));
6256    c = result.getColumnLatestCell(fam1, qual1);
6257    assertEquals(11L, c.getTimestamp());
6258    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6259  }
6260
6261  @Test
6262  public void testAppendTimestampsAreMonotonic() throws IOException {
6263    region = initHRegion(tableName, method, CONF, fam1);
6264    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6265    EnvironmentEdgeManager.injectEdge(edge);
6266
6267    edge.setValue(10);
6268    Append a = new Append(row);
6269    a.setDurability(Durability.SKIP_WAL);
6270    a.addColumn(fam1, qual1, qual1);
6271    region.append(a);
6272
6273    Result result = region.get(new Get(row));
6274    Cell c = result.getColumnLatestCell(fam1, qual1);
6275    assertNotNull(c);
6276    assertEquals(10L, c.getTimestamp());
6277
6278    edge.setValue(1); // clock goes back
6279    region.append(a);
6280    result = region.get(new Get(row));
6281    c = result.getColumnLatestCell(fam1, qual1);
6282    assertEquals(11L, c.getTimestamp());
6283
6284    byte[] expected = new byte[qual1.length*2];
6285    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6286    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6287
6288    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6289      expected, 0, expected.length));
6290  }
6291
6292  @Test
6293  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6294    region = initHRegion(tableName, method, CONF, fam1);
6295    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6296    EnvironmentEdgeManager.injectEdge(edge);
6297
6298    edge.setValue(10);
6299    Put p = new Put(row);
6300    p.setDurability(Durability.SKIP_WAL);
6301    p.addColumn(fam1, qual1, qual1);
6302    region.put(p);
6303
6304    Result result = region.get(new Get(row));
6305    Cell c = result.getColumnLatestCell(fam1, qual1);
6306    assertNotNull(c);
6307    assertEquals(10L, c.getTimestamp());
6308
6309    edge.setValue(1); // clock goes back
6310    p = new Put(row);
6311    p.setDurability(Durability.SKIP_WAL);
6312    p.addColumn(fam1, qual1, qual2);
6313    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6314    result = region.get(new Get(row));
6315    c = result.getColumnLatestCell(fam1, qual1);
6316    assertEquals(10L, c.getTimestamp());
6317
6318    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6319      qual2, 0, qual2.length));
6320  }
6321
6322  @Test
6323  public void testBatchMutateWithWrongRegionException() throws Exception {
6324    final byte[] a = Bytes.toBytes("a");
6325    final byte[] b = Bytes.toBytes("b");
6326    final byte[] c = Bytes.toBytes("c"); // exclusive
6327
6328    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6329    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6330    region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6331
6332    Mutation[] mutations = new Mutation[] {
6333        new Put(a)
6334            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6335              .setRow(a)
6336              .setFamily(fam1)
6337              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6338              .setType(Cell.Type.Put)
6339              .build()),
6340        // this is outside the region boundary
6341        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6342              .setRow(c)
6343              .setFamily(fam1)
6344              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6345              .setType(Type.Put)
6346              .build()),
6347        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6348              .setRow(b)
6349              .setFamily(fam1)
6350              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6351              .setType(Cell.Type.Put)
6352              .build())
6353    };
6354
6355    OperationStatus[] status = region.batchMutate(mutations);
6356    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6357    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6358    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6359
6360
6361    // test with a row lock held for a long time
6362    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6363    ExecutorService exec = Executors.newFixedThreadPool(2);
6364    Future<Void> f1 = exec.submit(new Callable<Void>() {
6365      @Override
6366      public Void call() throws Exception {
6367        LOG.info("Acquiring row lock");
6368        RowLock rl = region.getRowLock(b);
6369        obtainedRowLock.countDown();
6370        LOG.info("Waiting for 5 seconds before releasing lock");
6371        Threads.sleep(5000);
6372        LOG.info("Releasing row lock");
6373        rl.release();
6374        return null;
6375      }
6376    });
6377    obtainedRowLock.await(30, TimeUnit.SECONDS);
6378
6379    Future<Void> f2 = exec.submit(new Callable<Void>() {
6380      @Override
6381      public Void call() throws Exception {
6382        Mutation[] mutations = new Mutation[] {
6383            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6384                .setRow(a)
6385                .setFamily(fam1)
6386                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6387                .setType(Cell.Type.Put)
6388                .build()),
6389            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6390                .setRow(b)
6391                .setFamily(fam1)
6392                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6393                .setType(Cell.Type.Put)
6394                .build()),
6395        };
6396
6397        // this will wait for the row lock, and it will eventually succeed
6398        OperationStatus[] status = region.batchMutate(mutations);
6399        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6400        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6401        return null;
6402      }
6403    });
6404
6405    f1.get();
6406    f2.get();
6407
6408    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6409  }
6410
6411  @Test
6412  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6413    region = initHRegion(tableName, method, CONF, fam1);
6414    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6415    EnvironmentEdgeManager.injectEdge(edge);
6416
6417    edge.setValue(10);
6418    Put p = new Put(row);
6419    p.setDurability(Durability.SKIP_WAL);
6420    p.addColumn(fam1, qual1, qual1);
6421    region.put(p);
6422
6423    Result result = region.get(new Get(row));
6424    Cell c = result.getColumnLatestCell(fam1, qual1);
6425    assertNotNull(c);
6426    assertEquals(10L, c.getTimestamp());
6427
6428    edge.setValue(1); // clock goes back
6429    p = new Put(row);
6430    p.setDurability(Durability.SKIP_WAL);
6431    p.addColumn(fam1, qual1, qual2);
6432    RowMutations rm = new RowMutations(row);
6433    rm.add(p);
6434    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6435        new BinaryComparator(qual1), rm));
6436    result = region.get(new Get(row));
6437    c = result.getColumnLatestCell(fam1, qual1);
6438    assertEquals(10L, c.getTimestamp());
6439    LOG.info("c value " +
6440      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6441
6442    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6443      qual2, 0, qual2.length));
6444  }
6445
6446  HRegion initHRegion(TableName tableName, String callingMethod,
6447      byte[]... families) throws IOException {
6448    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6449        families);
6450  }
6451
6452  /**
6453   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6454   * @throws IOException if IO error occurred during test
6455   */
6456  @Test
6457  public void testWritesWhileRollWriter() throws IOException {
6458    int testCount = 10;
6459    int numRows = 1024;
6460    int numFamilies = 2;
6461    int numQualifiers = 2;
6462    final byte[][] families = new byte[numFamilies][];
6463    for (int i = 0; i < numFamilies; i++) {
6464      families[i] = Bytes.toBytes("family" + i);
6465    }
6466    final byte[][] qualifiers = new byte[numQualifiers][];
6467    for (int i = 0; i < numQualifiers; i++) {
6468      qualifiers[i] = Bytes.toBytes("qual" + i);
6469    }
6470
6471    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6472    this.region = initHRegion(tableName, method, CONF, families);
6473    try {
6474      List<Thread> threads = new ArrayList<>();
6475      for (int i = 0; i < numRows; i++) {
6476        final int count = i;
6477        Thread t = new Thread(new Runnable() {
6478
6479          @Override
6480          public void run() {
6481            byte[] row = Bytes.toBytes("row" + count);
6482            Put put = new Put(row);
6483            put.setDurability(Durability.SYNC_WAL);
6484            byte[] value = Bytes.toBytes(String.valueOf(count));
6485            for (byte[] family : families) {
6486              for (byte[] qualifier : qualifiers) {
6487                put.addColumn(family, qualifier, count, value);
6488              }
6489            }
6490            try {
6491              region.put(put);
6492            } catch (IOException e) {
6493              throw new RuntimeException(e);
6494            }
6495          }
6496        });
6497        threads.add(t);
6498      }
6499      for (Thread t : threads) {
6500        t.start();
6501      }
6502
6503      for (int i = 0; i < testCount; i++) {
6504        region.getWAL().rollWriter();
6505        Thread.yield();
6506      }
6507    } finally {
6508      try {
6509        HBaseTestingUtility.closeRegionAndWAL(this.region);
6510        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6511      } catch (DroppedSnapshotException dse) {
6512        // We could get this on way out because we interrupt the background flusher and it could
6513        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6514        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6515        // flush because the accounting is 'off' since original DSE.
6516      }
6517      this.region = null;
6518    }
6519  }
6520
6521  @Test
6522  public void testMutateRow_WriteRequestCount() throws Exception {
6523    byte[] row1 = Bytes.toBytes("row1");
6524    byte[] fam1 = Bytes.toBytes("fam1");
6525    byte[] qf1 = Bytes.toBytes("qualifier");
6526    byte[] val1 = Bytes.toBytes("value1");
6527
6528    RowMutations rm = new RowMutations(row1);
6529    Put put = new Put(row1);
6530    put.addColumn(fam1, qf1, val1);
6531    rm.add(put);
6532
6533    this.region = initHRegion(tableName, method, CONF, fam1);
6534    long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6535    this.region.mutateRow(rm);
6536    long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6537    Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6538  }
6539
6540  @Test
6541  public void testBulkLoadReplicationEnabled() throws IOException {
6542    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6543    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6544    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6545
6546    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6547    htd.addFamily(new HColumnDescriptor(fam1));
6548    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6549        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6550    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6551        rss, null);
6552
6553    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6554    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6555    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6556    assertTrue(plugins.contains(replicationCoprocessorClass));
6557    assertTrue(region.getCoprocessorHost().
6558        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6559  }
6560
6561  /**
6562   * The same as HRegion class, the only difference is that instantiateHStore will
6563   * create a different HStore - HStoreForTesting. [HBASE-8518]
6564   */
6565  public static class HRegionForTesting extends HRegion {
6566
6567    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6568                             final Configuration confParam, final RegionInfo regionInfo,
6569                             final TableDescriptor htd, final RegionServerServices rsServices) {
6570      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6571          wal, confParam, htd, rsServices);
6572    }
6573
6574    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6575                             Configuration confParam, TableDescriptor htd,
6576                             RegionServerServices rsServices) {
6577      super(fs, wal, confParam, htd, rsServices);
6578    }
6579
6580    /**
6581     * Create HStore instance.
6582     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6583     */
6584    @Override
6585    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
6586        throws IOException {
6587      if (family.isMobEnabled()) {
6588        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6589          throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
6590              " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
6591              " accordingly.");
6592        }
6593        return new HMobStore(this, family, this.conf, warmup);
6594      }
6595      return new HStoreForTesting(this, family, this.conf, warmup);
6596    }
6597  }
6598
6599  /**
6600   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6601   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6602   * doesn't let hstore compaction complete. In the former edition, this config is set in
6603   * HStore class inside compact method, though this is just for testing, otherwise it
6604   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6605   * config (except for testing code).
6606   */
6607  public static class HStoreForTesting extends HStore {
6608
6609    protected HStoreForTesting(final HRegion region,
6610        final ColumnFamilyDescriptor family,
6611        final Configuration confParam, boolean warmup) throws IOException {
6612      super(region, family, confParam, warmup);
6613    }
6614
6615    @Override
6616    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6617        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6618        List<Path> newFiles) throws IOException {
6619      // let compaction incomplete.
6620      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6621        LOG.warn("hbase.hstore.compaction.complete is set to false");
6622        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6623        final boolean evictOnClose =
6624            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6625        for (Path newFile : newFiles) {
6626          // Create storefile around what we wrote with a reader on it.
6627          HStoreFile sf = createStoreFileAndReader(newFile);
6628          sf.closeStoreFile(evictOnClose);
6629          sfs.add(sf);
6630        }
6631        return sfs;
6632      }
6633      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6634    }
6635  }
6636}