001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyLong;
033import static org.mockito.Mockito.doThrow;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.never;
036import static org.mockito.Mockito.spy;
037import static org.mockito.Mockito.times;
038import static org.mockito.Mockito.verify;
039import static org.mockito.Mockito.when;
040
041import java.io.IOException;
042import java.io.InterruptedIOException;
043import java.math.BigDecimal;
044import java.nio.charset.StandardCharsets;
045import java.security.PrivilegedExceptionAction;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.List;
050import java.util.Map;
051import java.util.NavigableMap;
052import java.util.Objects;
053import java.util.TreeMap;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.atomic.AtomicBoolean;
061import java.util.concurrent.atomic.AtomicInteger;
062import java.util.concurrent.atomic.AtomicReference;
063import org.apache.commons.lang3.RandomStringUtils;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.fs.FSDataOutputStream;
066import org.apache.hadoop.fs.FileStatus;
067import org.apache.hadoop.fs.FileSystem;
068import org.apache.hadoop.fs.Path;
069import org.apache.hadoop.hbase.ArrayBackedTag;
070import org.apache.hadoop.hbase.Cell;
071import org.apache.hadoop.hbase.Cell.Type;
072import org.apache.hadoop.hbase.CellBuilderFactory;
073import org.apache.hadoop.hbase.CellBuilderType;
074import org.apache.hadoop.hbase.CellUtil;
075import org.apache.hadoop.hbase.CompareOperator;
076import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
077import org.apache.hadoop.hbase.DroppedSnapshotException;
078import org.apache.hadoop.hbase.HBaseClassTestRule;
079import org.apache.hadoop.hbase.HBaseConfiguration;
080import org.apache.hadoop.hbase.HBaseTestingUtility;
081import org.apache.hadoop.hbase.HColumnDescriptor;
082import org.apache.hadoop.hbase.HConstants;
083import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
084import org.apache.hadoop.hbase.HDFSBlocksDistribution;
085import org.apache.hadoop.hbase.HRegionInfo;
086import org.apache.hadoop.hbase.HTableDescriptor;
087import org.apache.hadoop.hbase.KeyValue;
088import org.apache.hadoop.hbase.KeyValueUtil;
089import org.apache.hadoop.hbase.MiniHBaseCluster;
090import org.apache.hadoop.hbase.MultithreadedTestUtil;
091import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
092import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
093import org.apache.hadoop.hbase.NotServingRegionException;
094import org.apache.hadoop.hbase.PrivateCellUtil;
095import org.apache.hadoop.hbase.RegionTooBusyException;
096import org.apache.hadoop.hbase.ServerName;
097import org.apache.hadoop.hbase.TableName;
098import org.apache.hadoop.hbase.TagType;
099import org.apache.hadoop.hbase.Waiter;
100import org.apache.hadoop.hbase.client.Append;
101import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
102import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
103import org.apache.hadoop.hbase.client.Delete;
104import org.apache.hadoop.hbase.client.Durability;
105import org.apache.hadoop.hbase.client.Get;
106import org.apache.hadoop.hbase.client.Increment;
107import org.apache.hadoop.hbase.client.Mutation;
108import org.apache.hadoop.hbase.client.Put;
109import org.apache.hadoop.hbase.client.RegionInfo;
110import org.apache.hadoop.hbase.client.RegionInfoBuilder;
111import org.apache.hadoop.hbase.client.Result;
112import org.apache.hadoop.hbase.client.RowMutations;
113import org.apache.hadoop.hbase.client.Scan;
114import org.apache.hadoop.hbase.client.Table;
115import org.apache.hadoop.hbase.client.TableDescriptor;
116import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
117import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
118import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
119import org.apache.hadoop.hbase.filter.BigDecimalComparator;
120import org.apache.hadoop.hbase.filter.BinaryComparator;
121import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
122import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
123import org.apache.hadoop.hbase.filter.Filter;
124import org.apache.hadoop.hbase.filter.FilterBase;
125import org.apache.hadoop.hbase.filter.FilterList;
126import org.apache.hadoop.hbase.filter.NullComparator;
127import org.apache.hadoop.hbase.filter.PrefixFilter;
128import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
129import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
130import org.apache.hadoop.hbase.filter.SubstringComparator;
131import org.apache.hadoop.hbase.filter.ValueFilter;
132import org.apache.hadoop.hbase.io.hfile.HFile;
133import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
134import org.apache.hadoop.hbase.monitoring.MonitoredTask;
135import org.apache.hadoop.hbase.monitoring.TaskMonitor;
136import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
137import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
138import org.apache.hadoop.hbase.regionserver.Region.RowLock;
139import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
140import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
141import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
142import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
143import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
144import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
145import org.apache.hadoop.hbase.security.User;
146import org.apache.hadoop.hbase.test.MetricsAssertHelper;
147import org.apache.hadoop.hbase.testclassification.LargeTests;
148import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
149import org.apache.hadoop.hbase.util.Bytes;
150import org.apache.hadoop.hbase.util.CommonFSUtils;
151import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
152import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
153import org.apache.hadoop.hbase.util.FSUtils;
154import org.apache.hadoop.hbase.util.HFileArchiveUtil;
155import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
156import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
157import org.apache.hadoop.hbase.util.Threads;
158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
159import org.apache.hadoop.hbase.wal.FaultyFSLog;
160import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
161import org.apache.hadoop.hbase.wal.WAL;
162import org.apache.hadoop.hbase.wal.WALEdit;
163import org.apache.hadoop.hbase.wal.WALFactory;
164import org.apache.hadoop.hbase.wal.WALKeyImpl;
165import org.apache.hadoop.hbase.wal.WALProvider;
166import org.apache.hadoop.hbase.wal.WALProvider.Writer;
167import org.apache.hadoop.hbase.wal.WALSplitter;
168import org.junit.After;
169import org.junit.Assert;
170import org.junit.Before;
171import org.junit.ClassRule;
172import org.junit.Rule;
173import org.junit.Test;
174import org.junit.experimental.categories.Category;
175import org.junit.rules.ExpectedException;
176import org.junit.rules.TestName;
177import org.mockito.ArgumentCaptor;
178import org.mockito.ArgumentMatcher;
179import org.mockito.Mockito;
180import org.mockito.invocation.InvocationOnMock;
181import org.mockito.stubbing.Answer;
182import org.slf4j.Logger;
183import org.slf4j.LoggerFactory;
184
185import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
186import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
187import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
188import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
189import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
190
191import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
198
199/**
200 * Basic stand-alone testing of HRegion.  No clusters!
201 *
202 * A lot of the meta information for an HRegion now lives inside other HRegions
203 * or in the HBaseMaster, so only basic testing is possible.
204 */
205@Category({VerySlowRegionServerTests.class, LargeTests.class})
206@SuppressWarnings("deprecation")
207public class TestHRegion {
208
209  @ClassRule
210  public static final HBaseClassTestRule CLASS_RULE =
211      HBaseClassTestRule.forClass(TestHRegion.class);
212
213  // Do not spin up clusters in here. If you need to spin up a cluster, do it
214  // over in TestHRegionOnCluster.
215  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
216  @Rule
217  public TestName name = new TestName();
218  @Rule public final ExpectedException thrown = ExpectedException.none();
219
220  private static final String COLUMN_FAMILY = "MyCF";
221  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
222  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
223
224  HRegion region = null;
225  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
226  protected static HBaseTestingUtility TEST_UTIL;
227  public static Configuration CONF ;
228  private String dir;
229  private static FileSystem FILESYSTEM;
230  private final int MAX_VERSIONS = 2;
231
232  // Test names
233  protected TableName tableName;
234  protected String method;
235  protected final byte[] qual = Bytes.toBytes("qual");
236  protected final byte[] qual1 = Bytes.toBytes("qual1");
237  protected final byte[] qual2 = Bytes.toBytes("qual2");
238  protected final byte[] qual3 = Bytes.toBytes("qual3");
239  protected final byte[] value = Bytes.toBytes("value");
240  protected final byte[] value1 = Bytes.toBytes("value1");
241  protected final byte[] value2 = Bytes.toBytes("value2");
242  protected final byte[] row = Bytes.toBytes("rowA");
243  protected final byte[] row2 = Bytes.toBytes("rowB");
244
245  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
246      .getInstance(MetricsAssertHelper.class);
247
248  @Before
249  public void setup() throws IOException {
250    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
251    FILESYSTEM = TEST_UTIL.getTestFileSystem();
252    CONF = TEST_UTIL.getConfiguration();
253    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
254    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
255    method = name.getMethodName();
256    tableName = TableName.valueOf(method);
257    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
258  }
259
260  @After
261  public void tearDown() throws Exception {
262    EnvironmentEdgeManagerTestHelper.reset();
263    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
264    TEST_UTIL.cleanupTestDir();
265  }
266
267  /**
268   * Test that I can use the max flushed sequence id after the close.
269   * @throws IOException
270   */
271  @Test
272  public void testSequenceId() throws IOException {
273    HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
274    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
275    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
276    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
277    region.close();
278    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
279    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
280    // Open region again.
281    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
282    byte [] value = Bytes.toBytes(method);
283    // Make a random put against our cf.
284    Put put = new Put(value);
285    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
286    region.put(put);
287    // No flush yet so init numbers should still be in place.
288    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
289    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
290    region.flush(true);
291    long max = region.getMaxFlushedSeqId();
292    region.close();
293    assertEquals(max, region.getMaxFlushedSeqId());
294  }
295
296  /**
297   * Test for Bug 2 of HBASE-10466.
298   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
299   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
300   * flush is skipped and only the second flush takes place. However, two flushes are required in
301   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
302   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
303   * flushes for region close."
304   * @throws IOException
305   */
306  @Test
307  public void testCloseCarryingSnapshot() throws IOException {
308    HRegion region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
309    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
310    // Get some random bytes.
311    byte [] value = Bytes.toBytes(method);
312    // Make a random put against our cf.
313    Put put = new Put(value);
314    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
315    // First put something in current memstore, which will be in snapshot after flusher.prepare()
316    region.put(put);
317    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
318    storeFlushCtx.prepare();
319    // Second put something in current memstore
320    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
321    region.put(put);
322    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
323    region.close();
324    assertEquals(0, region.getMemStoreDataSize());
325    HBaseTestingUtility.closeRegionAndWAL(region);
326  }
327
328  /*
329   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
330   * See HBASE-10845
331   */
332  @Test
333  public void testMemstoreSnapshotSize() throws IOException {
334    class MyFaultyFSLog extends FaultyFSLog {
335      StoreFlushContext storeFlushCtx;
336      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
337          throws IOException {
338        super(fs, rootDir, logName, conf);
339      }
340
341      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
342        this.storeFlushCtx = storeFlushCtx;
343      }
344
345      @Override
346      public void sync(long txid) throws IOException {
347        storeFlushCtx.prepare();
348        super.sync(txid);
349      }
350    }
351
352    FileSystem fs = FileSystem.get(CONF);
353    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
354    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
355    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
356        COLUMN_FAMILY_BYTES);
357
358    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
359    // Get some random bytes.
360    byte [] value = Bytes.toBytes(method);
361    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
362
363    Put put = new Put(value);
364    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
365    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
366
367    boolean threwIOE = false;
368    try {
369      region.put(put);
370    } catch (IOException ioe) {
371      threwIOE = true;
372    } finally {
373      assertTrue("The regionserver should have thrown an exception", threwIOE);
374    }
375    MemStoreSize mss = store.getFlushableSize();
376    assertTrue("flushable size should be zero, but it is " + mss,
377        mss.getDataSize() == 0);
378    HBaseTestingUtility.closeRegionAndWAL(region);
379  }
380
381  /**
382   * Create a WAL outside of the usual helper in
383   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
384   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
385   * {@code fs.file.impl}.
386   * @param callingMethod a unique component for the path, probably the name of the test method.
387   */
388  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
389      Configuration conf, TableName tableName) throws IOException {
390    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
391    final Configuration walConf = new Configuration(conf);
392    FSUtils.setRootDir(walConf, logDir);
393    return new WALFactory(walConf, callingMethod)
394        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
395  }
396
397  @Test
398  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
399    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
400    FileSystem fs = FileSystem.get(CONF);
401    Path rootDir = new Path(dir + testName);
402    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
403    hLog.init();
404    HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
405        COLUMN_FAMILY_BYTES);
406    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
407    assertEquals(0, region.getMemStoreDataSize());
408
409    // Put one value
410    byte [] value = Bytes.toBytes(method);
411    Put put = new Put(value);
412    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
413    region.put(put);
414    long onePutSize = region.getMemStoreDataSize();
415    assertTrue(onePutSize > 0);
416
417    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
418    doThrow(new IOException())
419       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
420    region.setCoprocessorHost(mockedCPHost);
421
422    put = new Put(value);
423    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
424    try {
425      region.put(put);
426      fail("Should have failed with IOException");
427    } catch (IOException expected) {
428    }
429    long expectedSize = onePutSize * 2;
430    assertEquals("memstoreSize should be incremented",
431        expectedSize, region.getMemStoreDataSize());
432    assertEquals("flushable size should be incremented",
433        expectedSize, store.getFlushableSize().getDataSize());
434
435    region.setCoprocessorHost(null);
436    HBaseTestingUtility.closeRegionAndWAL(region);
437  }
438
439  /**
440   * A test case of HBASE-21041
441   * @throws Exception Exception
442   */
443  @Test
444  public void testFlushAndMemstoreSizeCounting() throws Exception {
445    byte[] family = Bytes.toBytes("family");
446    this.region = initHRegion(tableName, method, CONF, family);
447    final WALFactory wals = new WALFactory(CONF, method);
448    try {
449      for (byte[] row : HBaseTestingUtility.ROWS) {
450        Put put = new Put(row);
451        put.addColumn(family, family, row);
452        region.put(put);
453      }
454      region.flush(true);
455      // After flush, data size should be zero
456      assertEquals(0, region.getMemStoreDataSize());
457      // After flush, a new active mutable segment is created, so the heap size
458      // should equal to MutableSegment.DEEP_OVERHEAD
459      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
460      // After flush, offheap should be zero
461      assertEquals(0, region.getMemStoreOffHeapSize());
462    } finally {
463      HBaseTestingUtility.closeRegionAndWAL(this.region);
464      this.region = null;
465      wals.close();
466    }
467  }
468
469  /**
470   * Test we do not lose data if we fail a flush and then close.
471   * Part of HBase-10466.  Tests the following from the issue description:
472   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
473   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
474   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
475   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
476   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
477   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
478   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
479   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
480   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
481   * if memstoreSize is not larger than 0."
482   * @throws Exception
483   */
484  @Test
485  public void testFlushSizeAccounting() throws Exception {
486    final Configuration conf = HBaseConfiguration.create(CONF);
487    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
488    // Only retry once.
489    conf.setInt("hbase.hstore.flush.retries.number", 1);
490    final User user =
491      User.createUserForTesting(conf, method, new String[]{"foo"});
492    // Inject our faulty LocalFileSystem
493    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
494    user.runAs(new PrivilegedExceptionAction<Object>() {
495      @Override
496      public Object run() throws Exception {
497        // Make sure it worked (above is sensitive to caching details in hadoop core)
498        FileSystem fs = FileSystem.get(conf);
499        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
500        FaultyFileSystem ffs = (FaultyFileSystem)fs;
501        HRegion region = null;
502        try {
503          // Initialize region
504          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
505              COLUMN_FAMILY_BYTES);
506          long size = region.getMemStoreDataSize();
507          Assert.assertEquals(0, size);
508          // Put one item into memstore.  Measure the size of one item in memstore.
509          Put p1 = new Put(row);
510          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
511          region.put(p1);
512          final long sizeOfOnePut = region.getMemStoreDataSize();
513          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
514          try {
515            LOG.info("Flushing");
516            region.flush(true);
517            Assert.fail("Didn't bubble up IOE!");
518          } catch (DroppedSnapshotException dse) {
519            // What we are expecting
520            region.closing.set(false); // this is needed for the rest of the test to work
521          }
522          // Make it so all writes succeed from here on out
523          ffs.fault.set(false);
524          // Check sizes.  Should still be the one entry.
525          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
526          // Now add two entries so that on this next flush that fails, we can see if we
527          // subtract the right amount, the snapshot size only.
528          Put p2 = new Put(row);
529          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
530          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
531          region.put(p2);
532          long expectedSize = sizeOfOnePut * 3;
533          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
534          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
535          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
536          // it
537          region.flush(true);
538          // Make sure our memory accounting is right.
539          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
540        } finally {
541          HBaseTestingUtility.closeRegionAndWAL(region);
542        }
543        return null;
544      }
545    });
546    FileSystem.closeAllForUGI(user.getUGI());
547  }
548
549  @Test
550  public void testCloseWithFailingFlush() throws Exception {
551    final Configuration conf = HBaseConfiguration.create(CONF);
552    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
553    // Only retry once.
554    conf.setInt("hbase.hstore.flush.retries.number", 1);
555    final User user =
556      User.createUserForTesting(conf, this.method, new String[]{"foo"});
557    // Inject our faulty LocalFileSystem
558    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
559    user.runAs(new PrivilegedExceptionAction<Object>() {
560      @Override
561      public Object run() throws Exception {
562        // Make sure it worked (above is sensitive to caching details in hadoop core)
563        FileSystem fs = FileSystem.get(conf);
564        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
565        FaultyFileSystem ffs = (FaultyFileSystem)fs;
566        HRegion region = null;
567        try {
568          // Initialize region
569          region = initHRegion(tableName, null, null, false,
570              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
571          long size = region.getMemStoreDataSize();
572          Assert.assertEquals(0, size);
573          // Put one item into memstore.  Measure the size of one item in memstore.
574          Put p1 = new Put(row);
575          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
576          region.put(p1);
577          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
578          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
579          StoreFlushContext storeFlushCtx =
580              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
581          storeFlushCtx.prepare();
582          // Now add two entries to the foreground memstore.
583          Put p2 = new Put(row);
584          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
585          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
586          region.put(p2);
587          // Now try close on top of a failing flush.
588          region.close();
589          fail();
590        } catch (DroppedSnapshotException dse) {
591          // Expected
592          LOG.info("Expected DroppedSnapshotException");
593        } finally {
594          // Make it so all writes succeed from here on out so can close clean
595          ffs.fault.set(false);
596          HBaseTestingUtility.closeRegionAndWAL(region);
597        }
598        return null;
599      }
600    });
601    FileSystem.closeAllForUGI(user.getUGI());
602  }
603
604  @Test
605  public void testCompactionAffectedByScanners() throws Exception {
606    byte[] family = Bytes.toBytes("family");
607    this.region = initHRegion(tableName, method, CONF, family);
608
609    Put put = new Put(Bytes.toBytes("r1"));
610    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
611    region.put(put);
612    region.flush(true);
613
614    Scan scan = new Scan();
615    scan.setMaxVersions(3);
616    // open the first scanner
617    RegionScanner scanner1 = region.getScanner(scan);
618
619    Delete delete = new Delete(Bytes.toBytes("r1"));
620    region.delete(delete);
621    region.flush(true);
622
623    // open the second scanner
624    RegionScanner scanner2 = region.getScanner(scan);
625
626    List<Cell> results = new ArrayList<>();
627
628    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
629
630    // make a major compaction
631    region.compact(true);
632
633    // open the third scanner
634    RegionScanner scanner3 = region.getScanner(scan);
635
636    // get data from scanner 1, 2, 3 after major compaction
637    scanner1.next(results);
638    System.out.println(results);
639    assertEquals(1, results.size());
640
641    results.clear();
642    scanner2.next(results);
643    System.out.println(results);
644    assertEquals(0, results.size());
645
646    results.clear();
647    scanner3.next(results);
648    System.out.println(results);
649    assertEquals(0, results.size());
650  }
651
652  @Test
653  public void testToShowNPEOnRegionScannerReseek() throws Exception {
654    byte[] family = Bytes.toBytes("family");
655    this.region = initHRegion(tableName, method, CONF, family);
656
657    Put put = new Put(Bytes.toBytes("r1"));
658    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
659    region.put(put);
660    put = new Put(Bytes.toBytes("r2"));
661    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
662    region.put(put);
663    region.flush(true);
664
665    Scan scan = new Scan();
666    scan.setMaxVersions(3);
667    // open the first scanner
668    RegionScanner scanner1 = region.getScanner(scan);
669
670    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
671
672    region.compact(true);
673
674    scanner1.reseek(Bytes.toBytes("r2"));
675    List<Cell> results = new ArrayList<>();
676    scanner1.next(results);
677    Cell keyValue = results.get(0);
678    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
679    scanner1.close();
680  }
681
682  @Test
683  public void testArchiveRecoveredEditsReplay() throws Exception {
684    byte[] family = Bytes.toBytes("family");
685    this.region = initHRegion(tableName, method, CONF, family);
686    final WALFactory wals = new WALFactory(CONF, method);
687    try {
688      Path regiondir = region.getRegionFileSystem().getRegionDir();
689      FileSystem fs = region.getRegionFileSystem().getFileSystem();
690      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
691
692      Path recoveredEditsDir = new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
693
694      long maxSeqId = 1050;
695      long minSeqId = 1000;
696
697      for (long i = minSeqId; i <= maxSeqId; i += 10) {
698        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
699        fs.create(recoveredEdits);
700        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
701
702        long time = System.nanoTime();
703        WALEdit edit = new WALEdit();
704        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
705          .toBytes(i)));
706        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
707          HConstants.DEFAULT_CLUSTER_ID), edit));
708
709        writer.close();
710      }
711      MonitoredTask status = TaskMonitor.get().createStatus(method);
712      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
713      for (HStore store : region.getStores()) {
714        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
715      }
716      CONF.set("hbase.region.archive.recovered.edits", "true");
717      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
718      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
719      assertEquals(maxSeqId, seqId);
720      region.getMVCC().advanceTo(seqId);
721      String fakeFamilyName = recoveredEditsDir.getName();
722      Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
723      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
724        region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
725      FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
726      assertEquals(6, list.length);
727    } finally {
728      CONF.set("hbase.region.archive.recovered.edits", "false");
729      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
730      HBaseTestingUtility.closeRegionAndWAL(this.region);
731      this.region = null;
732      wals.close();
733    }
734  }
735
736  @Test
737  public void testSkipRecoveredEditsReplay() throws Exception {
738    byte[] family = Bytes.toBytes("family");
739    this.region = initHRegion(tableName, method, CONF, family);
740    final WALFactory wals = new WALFactory(CONF, method);
741    try {
742      Path regiondir = region.getRegionFileSystem().getRegionDir();
743      FileSystem fs = region.getRegionFileSystem().getFileSystem();
744      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
745
746      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
747
748      long maxSeqId = 1050;
749      long minSeqId = 1000;
750
751      for (long i = minSeqId; i <= maxSeqId; i += 10) {
752        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
753        fs.create(recoveredEdits);
754        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
755
756        long time = System.nanoTime();
757        WALEdit edit = new WALEdit();
758        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
759            .toBytes(i)));
760        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
761            HConstants.DEFAULT_CLUSTER_ID), edit));
762
763        writer.close();
764      }
765      MonitoredTask status = TaskMonitor.get().createStatus(method);
766      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
767      for (HStore store : region.getStores()) {
768        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
769      }
770      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
771      assertEquals(maxSeqId, seqId);
772      region.getMVCC().advanceTo(seqId);
773      Get get = new Get(row);
774      Result result = region.get(get);
775      for (long i = minSeqId; i <= maxSeqId; i += 10) {
776        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
777        assertEquals(1, kvs.size());
778        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
779      }
780    } finally {
781      HBaseTestingUtility.closeRegionAndWAL(this.region);
782      this.region = null;
783      wals.close();
784    }
785  }
786
787  @Test
788  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
789    byte[] family = Bytes.toBytes("family");
790    this.region = initHRegion(tableName, method, CONF, family);
791    final WALFactory wals = new WALFactory(CONF, method);
792    try {
793      Path regiondir = region.getRegionFileSystem().getRegionDir();
794      FileSystem fs = region.getRegionFileSystem().getFileSystem();
795      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
796
797      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
798
799      long maxSeqId = 1050;
800      long minSeqId = 1000;
801
802      for (long i = minSeqId; i <= maxSeqId; i += 10) {
803        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
804        fs.create(recoveredEdits);
805        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
806
807        long time = System.nanoTime();
808        WALEdit edit = new WALEdit();
809        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
810            .toBytes(i)));
811        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
812            HConstants.DEFAULT_CLUSTER_ID), edit));
813
814        writer.close();
815      }
816      long recoverSeqId = 1030;
817      MonitoredTask status = TaskMonitor.get().createStatus(method);
818      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
819      for (HStore store : region.getStores()) {
820        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
821      }
822      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
823      assertEquals(maxSeqId, seqId);
824      region.getMVCC().advanceTo(seqId);
825      Get get = new Get(row);
826      Result result = region.get(get);
827      for (long i = minSeqId; i <= maxSeqId; i += 10) {
828        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
829        if (i < recoverSeqId) {
830          assertEquals(0, kvs.size());
831        } else {
832          assertEquals(1, kvs.size());
833          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
834        }
835      }
836    } finally {
837      HBaseTestingUtility.closeRegionAndWAL(this.region);
838      this.region = null;
839      wals.close();
840    }
841  }
842
843  @Test
844  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
845    byte[] family = Bytes.toBytes("family");
846    this.region = initHRegion(tableName, method, CONF, family);
847    try {
848      Path regiondir = region.getRegionFileSystem().getRegionDir();
849      FileSystem fs = region.getRegionFileSystem().getFileSystem();
850
851      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
852      for (int i = 1000; i < 1050; i += 10) {
853        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
854        FSDataOutputStream dos = fs.create(recoveredEdits);
855        dos.writeInt(i);
856        dos.close();
857      }
858      long minSeqId = 2000;
859      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
860      FSDataOutputStream dos = fs.create(recoveredEdits);
861      dos.close();
862
863      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
864      for (HStore store : region.getStores()) {
865        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
866      }
867      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
868      assertEquals(minSeqId, seqId);
869    } finally {
870      HBaseTestingUtility.closeRegionAndWAL(this.region);
871      this.region = null;
872    }
873  }
874
875  @Test
876  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
877    byte[] family = Bytes.toBytes("family");
878    this.region = initHRegion(tableName, method, CONF, family);
879    final WALFactory wals = new WALFactory(CONF, method);
880    try {
881      Path regiondir = region.getRegionFileSystem().getRegionDir();
882      FileSystem fs = region.getRegionFileSystem().getFileSystem();
883      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
884      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
885
886      assertEquals(0, region.getStoreFileList(columns).size());
887
888      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
889
890      long maxSeqId = 1050;
891      long minSeqId = 1000;
892
893      for (long i = minSeqId; i <= maxSeqId; i += 10) {
894        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
895        fs.create(recoveredEdits);
896        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
897
898        long time = System.nanoTime();
899        WALEdit edit = null;
900        if (i == maxSeqId) {
901          edit = WALEdit.createCompaction(region.getRegionInfo(),
902          CompactionDescriptor.newBuilder()
903          .setTableName(ByteString.copyFrom(tableName.getName()))
904          .setFamilyName(ByteString.copyFrom(regionName))
905          .setEncodedRegionName(ByteString.copyFrom(regionName))
906          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
907          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
908          .build());
909        } else {
910          edit = new WALEdit();
911          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
912            .toBytes(i)));
913        }
914        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
915            HConstants.DEFAULT_CLUSTER_ID), edit));
916        writer.close();
917      }
918
919      long recoverSeqId = 1030;
920      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
921      MonitoredTask status = TaskMonitor.get().createStatus(method);
922      for (HStore store : region.getStores()) {
923        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
924      }
925      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
926      assertEquals(maxSeqId, seqId);
927
928      // assert that the files are flushed
929      assertEquals(1, region.getStoreFileList(columns).size());
930
931    } finally {
932      HBaseTestingUtility.closeRegionAndWAL(this.region);
933      this.region = null;
934      wals.close();
935    }
936  }
937
938  @Test
939  public void testRecoveredEditsReplayCompaction() throws Exception {
940    testRecoveredEditsReplayCompaction(false);
941    testRecoveredEditsReplayCompaction(true);
942  }
943
944  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
945    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
946    byte[] family = Bytes.toBytes("family");
947    this.region = initHRegion(tableName, method, CONF, family);
948    final WALFactory wals = new WALFactory(CONF, method);
949    try {
950      Path regiondir = region.getRegionFileSystem().getRegionDir();
951      FileSystem fs = region.getRegionFileSystem().getFileSystem();
952      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
953
954      long maxSeqId = 3;
955      long minSeqId = 0;
956
957      for (long i = minSeqId; i < maxSeqId; i++) {
958        Put put = new Put(Bytes.toBytes(i));
959        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
960        region.put(put);
961        region.flush(true);
962      }
963
964      // this will create a region with 3 files
965      assertEquals(3, region.getStore(family).getStorefilesCount());
966      List<Path> storeFiles = new ArrayList<>(3);
967      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
968        storeFiles.add(sf.getPath());
969      }
970
971      // disable compaction completion
972      CONF.setBoolean("hbase.hstore.compaction.complete", false);
973      region.compactStores();
974
975      // ensure that nothing changed
976      assertEquals(3, region.getStore(family).getStorefilesCount());
977
978      // now find the compacted file, and manually add it to the recovered edits
979      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
980      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
981      String errorMsg = "Expected to find 1 file in the region temp directory "
982          + "from the compaction, could not find any";
983      assertNotNull(errorMsg, files);
984      assertEquals(errorMsg, 1, files.length);
985      // move the file inside region dir
986      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
987          files[0].getPath());
988
989      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
990      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
991      for (int i=0; i < encodedNameAsBytes.length; i++) {
992        // Mix the byte array to have a new encodedName
993        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
994      }
995
996      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
997        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
998            storeFiles, Lists.newArrayList(newFile),
999            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
1000
1001      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
1002          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
1003
1004      Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
1005
1006      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1007      fs.create(recoveredEdits);
1008      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1009
1010      long time = System.nanoTime();
1011
1012      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
1013          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
1014          compactionDescriptor)));
1015      writer.close();
1016
1017      // close the region now, and reopen again
1018      region.getTableDescriptor();
1019      region.getRegionInfo();
1020      region.close();
1021      try {
1022        region = HRegion.openHRegion(region, null);
1023      } catch (WrongRegionException wre) {
1024        fail("Matching encoded region name should not have produced WrongRegionException");
1025      }
1026
1027      // now check whether we have only one store file, the compacted one
1028      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
1029      for (HStoreFile sf : sfs) {
1030        LOG.info(Objects.toString(sf.getPath()));
1031      }
1032      if (!mismatchedRegionName) {
1033        assertEquals(1, region.getStore(family).getStorefilesCount());
1034      }
1035      files = FSUtils.listStatus(fs, tmpDir);
1036      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
1037
1038      for (long i = minSeqId; i < maxSeqId; i++) {
1039        Get get = new Get(Bytes.toBytes(i));
1040        Result result = region.get(get);
1041        byte[] value = result.getValue(family, Bytes.toBytes(i));
1042        assertArrayEquals(Bytes.toBytes(i), value);
1043      }
1044    } finally {
1045      HBaseTestingUtility.closeRegionAndWAL(this.region);
1046      this.region = null;
1047      wals.close();
1048      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
1049    }
1050  }
1051
1052  @Test
1053  public void testFlushMarkers() throws Exception {
1054    // tests that flush markers are written to WAL and handled at recovered edits
1055    byte[] family = Bytes.toBytes("family");
1056    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1057    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1058    FSUtils.setRootDir(walConf, logDir);
1059    final WALFactory wals = new WALFactory(walConf, method);
1060    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1061
1062    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1063      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1064    try {
1065      Path regiondir = region.getRegionFileSystem().getRegionDir();
1066      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1067      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1068
1069      long maxSeqId = 3;
1070      long minSeqId = 0;
1071
1072      for (long i = minSeqId; i < maxSeqId; i++) {
1073        Put put = new Put(Bytes.toBytes(i));
1074        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1075        region.put(put);
1076        region.flush(true);
1077      }
1078
1079      // this will create a region with 3 files from flush
1080      assertEquals(3, region.getStore(family).getStorefilesCount());
1081      List<String> storeFiles = new ArrayList<>(3);
1082      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1083        storeFiles.add(sf.getPath().getName());
1084      }
1085
1086      // now verify that the flush markers are written
1087      wal.shutdown();
1088      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1089        TEST_UTIL.getConfiguration());
1090      try {
1091        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1092        long lastFlushSeqId = -1;
1093        while (true) {
1094          WAL.Entry entry = reader.next();
1095          if (entry == null) {
1096            break;
1097          }
1098          Cell cell = entry.getEdit().getCells().get(0);
1099          if (WALEdit.isMetaEditFamily(cell)) {
1100            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1101            assertNotNull(flushDesc);
1102            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1103            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1104              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1105            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1106              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1107            }
1108            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1109            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1110            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1111            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1112            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1113            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1114            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1115              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1116            } else {
1117              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1118              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1119            }
1120
1121            flushDescriptors.add(entry);
1122          }
1123        }
1124
1125        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1126
1127        // now write those markers to the recovered edits again.
1128
1129        Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
1130
1131        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1132        fs.create(recoveredEdits);
1133        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1134
1135        for (WAL.Entry entry : flushDescriptors) {
1136          writer.append(entry);
1137        }
1138        writer.close();
1139      } finally {
1140        if (null != reader) {
1141          try {
1142            reader.close();
1143          } catch (IOException exception) {
1144            LOG.warn("Problem closing wal: " + exception.getMessage());
1145            LOG.debug("exception details", exception);
1146          }
1147        }
1148      }
1149
1150
1151      // close the region now, and reopen again
1152      region.close();
1153      region = HRegion.openHRegion(region, null);
1154
1155      // now check whether we have can read back the data from region
1156      for (long i = minSeqId; i < maxSeqId; i++) {
1157        Get get = new Get(Bytes.toBytes(i));
1158        Result result = region.get(get);
1159        byte[] value = result.getValue(family, Bytes.toBytes(i));
1160        assertArrayEquals(Bytes.toBytes(i), value);
1161      }
1162    } finally {
1163      HBaseTestingUtility.closeRegionAndWAL(this.region);
1164      this.region = null;
1165      wals.close();
1166    }
1167  }
1168
1169  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1170    volatile FlushAction[] actions;
1171    public IsFlushWALMarker(FlushAction... actions) {
1172      this.actions = actions;
1173    }
1174    @Override
1175    public boolean matches(WALEdit edit) {
1176      List<Cell> cells = edit.getCells();
1177      if (cells.isEmpty()) {
1178        return false;
1179      }
1180      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1181        FlushDescriptor desc;
1182        try {
1183          desc = WALEdit.getFlushDescriptor(cells.get(0));
1184        } catch (IOException e) {
1185          LOG.warn(e.toString(), e);
1186          return false;
1187        }
1188        if (desc != null) {
1189          for (FlushAction action : actions) {
1190            if (desc.getAction() == action) {
1191              return true;
1192            }
1193          }
1194        }
1195      }
1196      return false;
1197    }
1198    public IsFlushWALMarker set(FlushAction... actions) {
1199      this.actions = actions;
1200      return this;
1201    }
1202  }
1203
1204  @Test
1205  public void testFlushMarkersWALFail() throws Exception {
1206    // test the cases where the WAL append for flush markers fail.
1207    byte[] family = Bytes.toBytes("family");
1208
1209    // spy an actual WAL implementation to throw exception (was not able to mock)
1210    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1211
1212    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1213    FSUtils.setRootDir(walConf, logDir);
1214    // Make up a WAL that we can manipulate at append time.
1215    class FailAppendFlushMarkerWAL extends FSHLog {
1216      volatile FlushAction [] flushActions = null;
1217
1218      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1219      throws IOException {
1220        super(fs, root, logDir, conf);
1221      }
1222
1223      @Override
1224      protected Writer createWriterInstance(Path path) throws IOException {
1225        final Writer w = super.createWriterInstance(path);
1226        return new Writer() {
1227          @Override
1228          public void close() throws IOException {
1229            w.close();
1230          }
1231
1232          @Override
1233          public void sync(boolean forceSync) throws IOException {
1234            w.sync(forceSync);
1235          }
1236
1237          @Override
1238          public void append(Entry entry) throws IOException {
1239            List<Cell> cells = entry.getEdit().getCells();
1240            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1241              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1242              if (desc != null) {
1243                for (FlushAction flushAction: flushActions) {
1244                  if (desc.getAction().equals(flushAction)) {
1245                    throw new IOException("Failed to append flush marker! " + flushAction);
1246                  }
1247                }
1248              }
1249            }
1250            w.append(entry);
1251          }
1252
1253          @Override
1254          public long getLength() {
1255            return w.getLength();
1256          }
1257        };
1258      }
1259    }
1260    FailAppendFlushMarkerWAL wal =
1261      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1262        method, walConf);
1263    wal.init();
1264    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1265      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1266    try {
1267      int i = 0;
1268      Put put = new Put(Bytes.toBytes(i));
1269      put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1270      put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1271      region.put(put);
1272
1273      // 1. Test case where START_FLUSH throws exception
1274      wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1275
1276      // start cache flush will throw exception
1277      try {
1278        region.flush(true);
1279        fail("This should have thrown exception");
1280      } catch (DroppedSnapshotException unexpected) {
1281        // this should not be a dropped snapshot exception. Meaning that RS will not abort
1282        throw unexpected;
1283      } catch (IOException expected) {
1284        // expected
1285      }
1286      // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1287      // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1288      region.close(true);
1289      wal.close();
1290
1291      // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1292      wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1293      wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1294            method, walConf);
1295      wal.init();
1296      this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1297        HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1298      region.put(put);
1299
1300      // 3. Test case where ABORT_FLUSH will throw exception.
1301      // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1302      // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
1303      wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1304
1305      try {
1306        region.flush(true);
1307        fail("This should have thrown exception");
1308      } catch (DroppedSnapshotException expected) {
1309        // we expect this exception, since we were able to write the snapshot, but failed to
1310        // write the flush marker to WAL
1311      } catch (IOException unexpected) {
1312        throw unexpected;
1313      }
1314    } finally {
1315      HBaseTestingUtility.closeRegionAndWAL(this.region);
1316      this.region = null;
1317    }
1318  }
1319
1320  @Test
1321  public void testGetWhileRegionClose() throws IOException {
1322    Configuration hc = initSplit();
1323    int numRows = 100;
1324    byte[][] families = { fam1, fam2, fam3 };
1325
1326    // Setting up region
1327    this.region = initHRegion(tableName, method, hc, families);
1328    try {
1329      // Put data in region
1330      final int startRow = 100;
1331      putData(startRow, numRows, qual1, families);
1332      putData(startRow, numRows, qual2, families);
1333      putData(startRow, numRows, qual3, families);
1334      final AtomicBoolean done = new AtomicBoolean(false);
1335      final AtomicInteger gets = new AtomicInteger(0);
1336      GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1337      try {
1338        // Set ten threads running concurrently getting from the region.
1339        for (int i = 0; i < threads.length / 2; i++) {
1340          threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1341          threads[i].setDaemon(true);
1342          threads[i].start();
1343        }
1344        // Artificially make the condition by setting closing flag explicitly.
1345        // I can't make the issue happen with a call to region.close().
1346        this.region.closing.set(true);
1347        for (int i = threads.length / 2; i < threads.length; i++) {
1348          threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1349          threads[i].setDaemon(true);
1350          threads[i].start();
1351        }
1352      } finally {
1353        if (this.region != null) {
1354          HBaseTestingUtility.closeRegionAndWAL(this.region);
1355        }
1356      }
1357      done.set(true);
1358      for (GetTillDoneOrException t : threads) {
1359        try {
1360          t.join();
1361        } catch (InterruptedException e) {
1362          e.printStackTrace();
1363        }
1364        if (t.e != null) {
1365          LOG.info("Exception=" + t.e);
1366          assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1367        }
1368      }
1369    } finally {
1370      HBaseTestingUtility.closeRegionAndWAL(this.region);
1371      this.region = null;
1372    }
1373  }
1374
1375  /*
1376   * Thread that does get on single row until 'done' flag is flipped. If an
1377   * exception causes us to fail, it records it.
1378   */
1379  class GetTillDoneOrException extends Thread {
1380    private final Get g;
1381    private final AtomicBoolean done;
1382    private final AtomicInteger count;
1383    private Exception e;
1384
1385    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1386        final AtomicInteger c) {
1387      super("getter." + i);
1388      this.g = new Get(r);
1389      this.done = d;
1390      this.count = c;
1391    }
1392
1393    @Override
1394    public void run() {
1395      while (!this.done.get()) {
1396        try {
1397          assertTrue(region.get(g).size() > 0);
1398          this.count.incrementAndGet();
1399        } catch (Exception e) {
1400          this.e = e;
1401          break;
1402        }
1403      }
1404    }
1405  }
1406
1407  /*
1408   * An involved filter test. Has multiple column families and deletes in mix.
1409   */
1410  @Test
1411  public void testWeirdCacheBehaviour() throws Exception {
1412    final TableName tableName = TableName.valueOf(name.getMethodName());
1413    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1414        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1415    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1416    try {
1417      String value = "this is the value";
1418      String value2 = "this is some other value";
1419      String keyPrefix1 = "prefix1";
1420      String keyPrefix2 = "prefix2";
1421      String keyPrefix3 = "prefix3";
1422      putRows(this.region, 3, value, keyPrefix1);
1423      putRows(this.region, 3, value, keyPrefix2);
1424      putRows(this.region, 3, value, keyPrefix3);
1425      putRows(this.region, 3, value2, keyPrefix1);
1426      putRows(this.region, 3, value2, keyPrefix2);
1427      putRows(this.region, 3, value2, keyPrefix3);
1428      System.out.println("Checking values for key: " + keyPrefix1);
1429      assertEquals("Got back incorrect number of rows from scan", 3,
1430          getNumberOfRows(keyPrefix1, value2, this.region));
1431      System.out.println("Checking values for key: " + keyPrefix2);
1432      assertEquals("Got back incorrect number of rows from scan", 3,
1433          getNumberOfRows(keyPrefix2, value2, this.region));
1434      System.out.println("Checking values for key: " + keyPrefix3);
1435      assertEquals("Got back incorrect number of rows from scan", 3,
1436          getNumberOfRows(keyPrefix3, value2, this.region));
1437      deleteColumns(this.region, value2, keyPrefix1);
1438      deleteColumns(this.region, value2, keyPrefix2);
1439      deleteColumns(this.region, value2, keyPrefix3);
1440      System.out.println("Starting important checks.....");
1441      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1442          getNumberOfRows(keyPrefix1, value2, this.region));
1443      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1444          getNumberOfRows(keyPrefix2, value2, this.region));
1445      assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1446          getNumberOfRows(keyPrefix3, value2, this.region));
1447    } finally {
1448      HBaseTestingUtility.closeRegionAndWAL(this.region);
1449      this.region = null;
1450    }
1451  }
1452
1453  @Test
1454  public void testAppendWithReadOnlyTable() throws Exception {
1455    final TableName tableName = TableName.valueOf(name.getMethodName());
1456    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1457    boolean exceptionCaught = false;
1458    Append append = new Append(Bytes.toBytes("somerow"));
1459    append.setDurability(Durability.SKIP_WAL);
1460    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1461        Bytes.toBytes("somevalue"));
1462    try {
1463      region.append(append);
1464    } catch (IOException e) {
1465      exceptionCaught = true;
1466    } finally {
1467      HBaseTestingUtility.closeRegionAndWAL(this.region);
1468      this.region = null;
1469    }
1470    assertTrue(exceptionCaught == true);
1471  }
1472
1473  @Test
1474  public void testIncrWithReadOnlyTable() throws Exception {
1475    final TableName tableName = TableName.valueOf(name.getMethodName());
1476    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1477    boolean exceptionCaught = false;
1478    Increment inc = new Increment(Bytes.toBytes("somerow"));
1479    inc.setDurability(Durability.SKIP_WAL);
1480    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1481    try {
1482      region.increment(inc);
1483    } catch (IOException e) {
1484      exceptionCaught = true;
1485    } finally {
1486      HBaseTestingUtility.closeRegionAndWAL(this.region);
1487      this.region = null;
1488    }
1489    assertTrue(exceptionCaught == true);
1490  }
1491
1492  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1493    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1494    int count = 0;
1495    boolean more = false;
1496    List<Cell> results = new ArrayList<>();
1497    do {
1498      more = scanner.next(results);
1499      if (results != null && !results.isEmpty())
1500        count++;
1501      else
1502        break;
1503      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1504      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1505      r.delete(delete);
1506      results.clear();
1507    } while (more);
1508    assertEquals("Did not perform correct number of deletes", 3, count);
1509  }
1510
1511  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1512    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1513    int numberOfResults = 0;
1514    List<Cell> results = new ArrayList<>();
1515    boolean more = false;
1516    do {
1517      more = resultScanner.next(results);
1518      if (results != null && !results.isEmpty())
1519        numberOfResults++;
1520      else
1521        break;
1522      for (Cell kv : results) {
1523        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1524      }
1525      results.clear();
1526    } while (more);
1527    return numberOfResults;
1528  }
1529
1530  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1531      throws IOException {
1532    // Defaults FilterList.Operator.MUST_PASS_ALL.
1533    FilterList allFilters = new FilterList();
1534    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1535    // Only return rows where this column value exists in the row.
1536    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1537        Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1538    filter.setFilterIfMissing(true);
1539    allFilters.addFilter(filter);
1540    Scan scan = new Scan();
1541    scan.addFamily(Bytes.toBytes("trans-blob"));
1542    scan.addFamily(Bytes.toBytes("trans-type"));
1543    scan.addFamily(Bytes.toBytes("trans-date"));
1544    scan.addFamily(Bytes.toBytes("trans-tags"));
1545    scan.addFamily(Bytes.toBytes("trans-group"));
1546    scan.setFilter(allFilters);
1547    return r.getScanner(scan);
1548  }
1549
1550  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1551    for (int i = 0; i < numRows; i++) {
1552      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1553      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1554      Put put = new Put(Bytes.toBytes(row));
1555      put.setDurability(Durability.SKIP_WAL);
1556      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1557      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1558      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1559      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1560      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1561      r.put(put);
1562    }
1563  }
1564
1565  @Test
1566  public void testFamilyWithAndWithoutColon() throws Exception {
1567    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1568    this.region = initHRegion(tableName, method, CONF, cf);
1569    try {
1570      Put p = new Put(tableName.toBytes());
1571      byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1572      p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1573      boolean exception = false;
1574      try {
1575        this.region.put(p);
1576      } catch (NoSuchColumnFamilyException e) {
1577        exception = true;
1578      }
1579      assertTrue(exception);
1580    } finally {
1581      HBaseTestingUtility.closeRegionAndWAL(this.region);
1582      this.region = null;
1583    }
1584  }
1585
1586  @Test
1587  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1588    final Put[] puts = new Put[10];
1589    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1590    try {
1591      long syncs = prepareRegionForBachPut(puts, source, false);
1592
1593      OperationStatus[] codes = this.region.batchMutate(puts);
1594      assertEquals(10, codes.length);
1595      for (int i = 0; i < 10; i++) {
1596        assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1597      }
1598      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1599
1600      LOG.info("Next a batch put with one invalid family");
1601      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1602      codes = this.region.batchMutate(puts);
1603      assertEquals(10, codes.length);
1604      for (int i = 0; i < 10; i++) {
1605        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1606            codes[i].getOperationStatusCode());
1607      }
1608
1609      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1610    } finally {
1611      HBaseTestingUtility.closeRegionAndWAL(this.region);
1612      this.region = null;
1613    }
1614  }
1615
1616  @Test
1617  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1618    final Put[] puts = new Put[10];
1619    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1620    try {
1621      long syncs = prepareRegionForBachPut(puts, source, false);
1622
1623      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1624
1625      LOG.info("batchPut will have to break into four batches to avoid row locks");
1626      RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1627      RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1628      RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1629      RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1630
1631      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1632      final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1633      final CountDownLatch startingPuts = new CountDownLatch(1);
1634      final CountDownLatch startingClose = new CountDownLatch(1);
1635      TestThread putter = new TestThread(ctx) {
1636        @Override
1637        public void doWork() throws IOException {
1638          startingPuts.countDown();
1639          retFromThread.set(region.batchMutate(puts));
1640        }
1641      };
1642      LOG.info("...starting put thread while holding locks");
1643      ctx.addThread(putter);
1644      ctx.startThreads();
1645
1646      // Now attempt to close the region from another thread.  Prior to HBASE-12565
1647      // this would cause the in-progress batchMutate operation to to fail with
1648      // exception because it use to release and re-acquire the close-guard lock
1649      // between batches.  Caller then didn't get status indicating which writes succeeded.
1650      // We now expect this thread to block until the batchMutate call finishes.
1651      Thread regionCloseThread = new TestThread(ctx) {
1652        @Override
1653        public void doWork() {
1654          try {
1655            startingPuts.await();
1656            // Give some time for the batch mutate to get in.
1657            // We don't want to race with the mutate
1658            Thread.sleep(10);
1659            startingClose.countDown();
1660            HBaseTestingUtility.closeRegionAndWAL(region);
1661          } catch (IOException e) {
1662            throw new RuntimeException(e);
1663          } catch (InterruptedException e) {
1664            throw new RuntimeException(e);
1665          }
1666        }
1667      };
1668      regionCloseThread.start();
1669
1670      startingClose.await();
1671      startingPuts.await();
1672      Thread.sleep(100);
1673      LOG.info("...releasing row lock 1, which should let put thread continue");
1674      rowLock1.release();
1675      rowLock2.release();
1676      rowLock3.release();
1677      waitForCounter(source, "syncTimeNumOps", syncs + 1);
1678
1679      LOG.info("...joining on put thread");
1680      ctx.stop();
1681      regionCloseThread.join();
1682
1683      OperationStatus[] codes = retFromThread.get();
1684      for (int i = 0; i < codes.length; i++) {
1685        assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1686            codes[i].getOperationStatusCode());
1687      }
1688      rowLock4.release();
1689    } finally {
1690      HBaseTestingUtility.closeRegionAndWAL(this.region);
1691      this.region = null;
1692    }
1693  }
1694
1695  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1696      throws InterruptedException {
1697    long startWait = System.currentTimeMillis();
1698    long currentCount;
1699    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1700      Thread.sleep(100);
1701      if (System.currentTimeMillis() - startWait > 10000) {
1702        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1703            expectedCount, currentCount));
1704      }
1705    }
1706  }
1707
1708  @Test
1709  public void testAtomicBatchPut() throws IOException {
1710    final Put[] puts = new Put[10];
1711    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1712    try {
1713      long syncs = prepareRegionForBachPut(puts, source, false);
1714
1715      // 1. Straight forward case, should succeed
1716      MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1717          HConstants.NO_NONCE, HConstants.NO_NONCE);
1718      OperationStatus[] codes = this.region.batchMutate(batchOp);
1719      assertEquals(10, codes.length);
1720      for (int i = 0; i < 10; i++) {
1721        assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1722      }
1723      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1724
1725      // 2. Failed to get lock
1726      RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1727      // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1728      // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1729      MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1730      final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1731      final CountDownLatch finishedPuts = new CountDownLatch(1);
1732      final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1733          HConstants
1734          .NO_NONCE,
1735          HConstants.NO_NONCE);
1736      TestThread putter = new TestThread(ctx) {
1737        @Override
1738        public void doWork() throws IOException {
1739          try {
1740            region.batchMutate(finalBatchOp);
1741          } catch (IOException ioe) {
1742            LOG.error("test failed!", ioe);
1743            retFromThread.set(ioe);
1744          }
1745          finishedPuts.countDown();
1746        }
1747      };
1748      LOG.info("...starting put thread while holding locks");
1749      ctx.addThread(putter);
1750      ctx.startThreads();
1751      LOG.info("...waiting for batch puts while holding locks");
1752      try {
1753        finishedPuts.await();
1754      } catch (InterruptedException e) {
1755        LOG.error("Interrupted!", e);
1756      } finally {
1757        if (lock != null) {
1758          lock.release();
1759        }
1760      }
1761      assertNotNull(retFromThread.get());
1762      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1763
1764      // 3. Exception thrown in validation
1765      LOG.info("Next a batch put with one invalid family");
1766      puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1767      batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1768          HConstants.NO_NONCE);
1769      thrown.expect(NoSuchColumnFamilyException.class);
1770      this.region.batchMutate(batchOp);
1771    } finally {
1772      HBaseTestingUtility.closeRegionAndWAL(this.region);
1773      this.region = null;
1774    }
1775  }
1776
1777  @Test
1778  public void testBatchPutWithTsSlop() throws Exception {
1779    // add data with a timestamp that is too recent for range. Ensure assert
1780    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1781    final Put[] puts = new Put[10];
1782    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1783
1784    try {
1785      long syncs = prepareRegionForBachPut(puts, source, true);
1786
1787      OperationStatus[] codes = this.region.batchMutate(puts);
1788      assertEquals(10, codes.length);
1789      for (int i = 0; i < 10; i++) {
1790        assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1791      }
1792      metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1793    } finally {
1794      HBaseTestingUtility.closeRegionAndWAL(this.region);
1795      this.region = null;
1796    }
1797  }
1798
1799  /**
1800   * @return syncs initial syncTimeNumOps
1801   */
1802  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1803      boolean slop) throws IOException {
1804    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1805
1806    LOG.info("First a batch put with all valid puts");
1807    for (int i = 0; i < puts.length; i++) {
1808      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1809          new Put(Bytes.toBytes("row_" + i));
1810      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1811    }
1812
1813    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1814    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1815    return syncs;
1816  }
1817
1818  // ////////////////////////////////////////////////////////////////////////////
1819  // checkAndMutate tests
1820  // ////////////////////////////////////////////////////////////////////////////
1821  @Test
1822  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1823    byte[] row1 = Bytes.toBytes("row1");
1824    byte[] fam1 = Bytes.toBytes("fam1");
1825    byte[] qf1 = Bytes.toBytes("qualifier");
1826    byte[] emptyVal = new byte[] {};
1827    byte[] val1 = Bytes.toBytes("value1");
1828    byte[] val2 = Bytes.toBytes("value2");
1829
1830    // Setting up region
1831    this.region = initHRegion(tableName, method, CONF, fam1);
1832    try {
1833      // Putting empty data in key
1834      Put put = new Put(row1);
1835      put.addColumn(fam1, qf1, emptyVal);
1836
1837      // checkAndPut with empty value
1838      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1839          emptyVal), put);
1840      assertTrue(res);
1841
1842      // Putting data in key
1843      put = new Put(row1);
1844      put.addColumn(fam1, qf1, val1);
1845
1846      // checkAndPut with correct value
1847      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1848          put);
1849      assertTrue(res);
1850
1851      // not empty anymore
1852      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1853          put);
1854      assertFalse(res);
1855
1856      Delete delete = new Delete(row1);
1857      delete.addColumn(fam1, qf1);
1858      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1859          delete);
1860      assertFalse(res);
1861
1862      put = new Put(row1);
1863      put.addColumn(fam1, qf1, val2);
1864      // checkAndPut with correct value
1865      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1866          put);
1867      assertTrue(res);
1868
1869      // checkAndDelete with correct value
1870      delete = new Delete(row1);
1871      delete.addColumn(fam1, qf1);
1872      delete.addColumn(fam1, qf1);
1873      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
1874          delete);
1875      assertTrue(res);
1876
1877      delete = new Delete(row1);
1878      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
1879          delete);
1880      assertTrue(res);
1881
1882      // checkAndPut looking for a null value
1883      put = new Put(row1);
1884      put.addColumn(fam1, qf1, val1);
1885
1886      res = region
1887          .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1888      assertTrue(res);
1889    } finally {
1890      HBaseTestingUtility.closeRegionAndWAL(this.region);
1891      this.region = null;
1892    }
1893  }
1894
1895  @Test
1896  public void testCheckAndMutate_WithWrongValue() throws IOException {
1897    byte[] row1 = Bytes.toBytes("row1");
1898    byte[] fam1 = Bytes.toBytes("fam1");
1899    byte[] qf1 = Bytes.toBytes("qualifier");
1900    byte[] val1 = Bytes.toBytes("value1");
1901    byte[] val2 = Bytes.toBytes("value2");
1902    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1903    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1904
1905    // Setting up region
1906    this.region = initHRegion(tableName, method, CONF, fam1);
1907    try {
1908      // Putting data in key
1909      Put put = new Put(row1);
1910      put.addColumn(fam1, qf1, val1);
1911      region.put(put);
1912
1913      // checkAndPut with wrong value
1914      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1915          val2), put);
1916      assertEquals(false, res);
1917
1918      // checkAndDelete with wrong value
1919      Delete delete = new Delete(row1);
1920      delete.addFamily(fam1);
1921      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val2),
1922          put);
1923      assertEquals(false, res);
1924
1925      // Putting data in key
1926      put = new Put(row1);
1927      put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1928      region.put(put);
1929
1930      // checkAndPut with wrong value
1931      res =
1932          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1933              bd2), put);
1934      assertEquals(false, res);
1935
1936      // checkAndDelete with wrong value
1937      delete = new Delete(row1);
1938      delete.addFamily(fam1);
1939      res =
1940          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1941              bd2), put);
1942      assertEquals(false, res);
1943    } finally {
1944      HBaseTestingUtility.closeRegionAndWAL(this.region);
1945      this.region = null;
1946    }
1947  }
1948
1949  @Test
1950  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1951    byte[] row1 = Bytes.toBytes("row1");
1952    byte[] fam1 = Bytes.toBytes("fam1");
1953    byte[] qf1 = Bytes.toBytes("qualifier");
1954    byte[] val1 = Bytes.toBytes("value1");
1955    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1956
1957    // Setting up region
1958    this.region = initHRegion(tableName, method, CONF, fam1);
1959    try {
1960      // Putting data in key
1961      Put put = new Put(row1);
1962      put.addColumn(fam1, qf1, val1);
1963      region.put(put);
1964
1965      // checkAndPut with correct value
1966      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
1967          val1), put);
1968      assertEquals(true, res);
1969
1970      // checkAndDelete with correct value
1971      Delete delete = new Delete(row1);
1972      delete.addColumn(fam1, qf1);
1973      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1974          delete);
1975      assertEquals(true, res);
1976
1977      // Putting data in key
1978      put = new Put(row1);
1979      put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1980      region.put(put);
1981
1982      // checkAndPut with correct value
1983      res =
1984          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1985              bd1), put);
1986      assertEquals(true, res);
1987
1988      // checkAndDelete with correct value
1989      delete = new Delete(row1);
1990      delete.addColumn(fam1, qf1);
1991      res =
1992          region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1993              bd1), delete);
1994      assertEquals(true, res);
1995    } finally {
1996      HBaseTestingUtility.closeRegionAndWAL(this.region);
1997      this.region = null;
1998    }
1999  }
2000
2001  @Test
2002  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
2003    byte[] row1 = Bytes.toBytes("row1");
2004    byte[] fam1 = Bytes.toBytes("fam1");
2005    byte[] qf1 = Bytes.toBytes("qualifier");
2006    byte[] val1 = Bytes.toBytes("value1");
2007    byte[] val2 = Bytes.toBytes("value2");
2008    byte[] val3 = Bytes.toBytes("value3");
2009    byte[] val4 = Bytes.toBytes("value4");
2010
2011    // Setting up region
2012    this.region = initHRegion(tableName, method, CONF, fam1);
2013    try {
2014      // Putting val3 in key
2015      Put put = new Put(row1);
2016      put.addColumn(fam1, qf1, val3);
2017      region.put(put);
2018
2019      // Test CompareOp.LESS: original = val3, compare with val3, fail
2020      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
2021          new BinaryComparator(val3), put);
2022      assertEquals(false, res);
2023
2024      // Test CompareOp.LESS: original = val3, compare with val4, fail
2025      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
2026          new BinaryComparator(val4), put);
2027      assertEquals(false, res);
2028
2029      // Test CompareOp.LESS: original = val3, compare with val2,
2030      // succeed (now value = val2)
2031      put = new Put(row1);
2032      put.addColumn(fam1, qf1, val2);
2033      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
2034          new BinaryComparator(val2), put);
2035      assertEquals(true, res);
2036
2037      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
2038      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
2039          new BinaryComparator(val3), put);
2040      assertEquals(false, res);
2041
2042      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
2043      // succeed (value still = val2)
2044      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
2045          new BinaryComparator(val2), put);
2046      assertEquals(true, res);
2047
2048      // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
2049      // succeed (now value = val3)
2050      put = new Put(row1);
2051      put.addColumn(fam1, qf1, val3);
2052      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
2053          new BinaryComparator(val1), put);
2054      assertEquals(true, res);
2055
2056      // Test CompareOp.GREATER: original = val3, compare with val3, fail
2057      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2058          new BinaryComparator(val3), put);
2059      assertEquals(false, res);
2060
2061      // Test CompareOp.GREATER: original = val3, compare with val2, fail
2062      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2063          new BinaryComparator(val2), put);
2064      assertEquals(false, res);
2065
2066      // Test CompareOp.GREATER: original = val3, compare with val4,
2067      // succeed (now value = val2)
2068      put = new Put(row1);
2069      put.addColumn(fam1, qf1, val2);
2070      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2071          new BinaryComparator(val4), put);
2072      assertEquals(true, res);
2073
2074      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2075      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2076          new BinaryComparator(val1), put);
2077      assertEquals(false, res);
2078
2079      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2080      // succeed (value still = val2)
2081      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2082          new BinaryComparator(val2), put);
2083      assertEquals(true, res);
2084
2085      // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2086      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2087          new BinaryComparator(val3), put);
2088      assertEquals(true, res);
2089    } finally {
2090      HBaseTestingUtility.closeRegionAndWAL(this.region);
2091      this.region = null;
2092    }
2093  }
2094
2095  @Test
2096  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2097    byte[] row1 = Bytes.toBytes("row1");
2098    byte[] fam1 = Bytes.toBytes("fam1");
2099    byte[] fam2 = Bytes.toBytes("fam2");
2100    byte[] qf1 = Bytes.toBytes("qualifier");
2101    byte[] val1 = Bytes.toBytes("value1");
2102    byte[] val2 = Bytes.toBytes("value2");
2103
2104    byte[][] families = { fam1, fam2 };
2105
2106    // Setting up region
2107    this.region = initHRegion(tableName, method, CONF, families);
2108    try {
2109      // Putting data in the key to check
2110      Put put = new Put(row1);
2111      put.addColumn(fam1, qf1, val1);
2112      region.put(put);
2113
2114      // Creating put to add
2115      long ts = System.currentTimeMillis();
2116      KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2117      put = new Put(row1);
2118      put.add(kv);
2119
2120      // checkAndPut with wrong value
2121      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
2122          val1), put);
2123      assertEquals(true, res);
2124
2125      Get get = new Get(row1);
2126      get.addColumn(fam2, qf1);
2127      Cell[] actual = region.get(get).rawCells();
2128
2129      Cell[] expected = { kv };
2130
2131      assertEquals(expected.length, actual.length);
2132      for (int i = 0; i < actual.length; i++) {
2133        assertEquals(expected[i], actual[i]);
2134      }
2135    } finally {
2136      HBaseTestingUtility.closeRegionAndWAL(this.region);
2137      this.region = null;
2138    }
2139  }
2140
2141  @Test
2142  public void testCheckAndPut_wrongRowInPut() throws IOException {
2143    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2144    try {
2145      Put put = new Put(row2);
2146      put.addColumn(fam1, qual1, value1);
2147      try {
2148        region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2149            new BinaryComparator(value2), put);
2150        fail();
2151      } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2152        // expected exception.
2153      }
2154    } finally {
2155      HBaseTestingUtility.closeRegionAndWAL(this.region);
2156      this.region = null;
2157    }
2158  }
2159
2160  @Test
2161  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2162    byte[] row1 = Bytes.toBytes("row1");
2163    byte[] fam1 = Bytes.toBytes("fam1");
2164    byte[] fam2 = Bytes.toBytes("fam2");
2165    byte[] qf1 = Bytes.toBytes("qualifier1");
2166    byte[] qf2 = Bytes.toBytes("qualifier2");
2167    byte[] qf3 = Bytes.toBytes("qualifier3");
2168    byte[] val1 = Bytes.toBytes("value1");
2169    byte[] val2 = Bytes.toBytes("value2");
2170    byte[] val3 = Bytes.toBytes("value3");
2171    byte[] emptyVal = new byte[] {};
2172
2173    byte[][] families = { fam1, fam2 };
2174
2175    // Setting up region
2176    this.region = initHRegion(tableName, method, CONF, families);
2177    try {
2178      // Put content
2179      Put put = new Put(row1);
2180      put.addColumn(fam1, qf1, val1);
2181      region.put(put);
2182      Threads.sleep(2);
2183
2184      put = new Put(row1);
2185      put.addColumn(fam1, qf1, val2);
2186      put.addColumn(fam2, qf1, val3);
2187      put.addColumn(fam2, qf2, val2);
2188      put.addColumn(fam2, qf3, val1);
2189      put.addColumn(fam1, qf3, val1);
2190      region.put(put);
2191
2192      // Multi-column delete
2193      Delete delete = new Delete(row1);
2194      delete.addColumn(fam1, qf1);
2195      delete.addColumn(fam2, qf1);
2196      delete.addColumn(fam1, qf3);
2197      boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(
2198          val2), delete);
2199      assertEquals(true, res);
2200
2201      Get get = new Get(row1);
2202      get.addColumn(fam1, qf1);
2203      get.addColumn(fam1, qf3);
2204      get.addColumn(fam2, qf2);
2205      Result r = region.get(get);
2206      assertEquals(2, r.size());
2207      assertArrayEquals(val1, r.getValue(fam1, qf1));
2208      assertArrayEquals(val2, r.getValue(fam2, qf2));
2209
2210      // Family delete
2211      delete = new Delete(row1);
2212      delete.addFamily(fam2);
2213      res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL, new BinaryComparator(emptyVal),
2214          delete);
2215      assertEquals(true, res);
2216
2217      get = new Get(row1);
2218      r = region.get(get);
2219      assertEquals(1, r.size());
2220      assertArrayEquals(val1, r.getValue(fam1, qf1));
2221
2222      // Row delete
2223      delete = new Delete(row1);
2224      res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2225          delete);
2226      assertEquals(true, res);
2227      get = new Get(row1);
2228      r = region.get(get);
2229      assertEquals(0, r.size());
2230    } finally {
2231      HBaseTestingUtility.closeRegionAndWAL(this.region);
2232      this.region = null;
2233    }
2234  }
2235
2236  // ////////////////////////////////////////////////////////////////////////////
2237  // Delete tests
2238  // ////////////////////////////////////////////////////////////////////////////
2239  @Test
2240  public void testDelete_multiDeleteColumn() throws IOException {
2241    byte[] row1 = Bytes.toBytes("row1");
2242    byte[] fam1 = Bytes.toBytes("fam1");
2243    byte[] qual = Bytes.toBytes("qualifier");
2244    byte[] value = Bytes.toBytes("value");
2245
2246    Put put = new Put(row1);
2247    put.addColumn(fam1, qual, 1, value);
2248    put.addColumn(fam1, qual, 2, value);
2249
2250    this.region = initHRegion(tableName, method, CONF, fam1);
2251    try {
2252      region.put(put);
2253
2254      // We do support deleting more than 1 'latest' version
2255      Delete delete = new Delete(row1);
2256      delete.addColumn(fam1, qual);
2257      delete.addColumn(fam1, qual);
2258      region.delete(delete);
2259
2260      Get get = new Get(row1);
2261      get.addFamily(fam1);
2262      Result r = region.get(get);
2263      assertEquals(0, r.size());
2264    } finally {
2265      HBaseTestingUtility.closeRegionAndWAL(this.region);
2266      this.region = null;
2267    }
2268  }
2269
2270  @Test
2271  public void testDelete_CheckFamily() throws IOException {
2272    byte[] row1 = Bytes.toBytes("row1");
2273    byte[] fam1 = Bytes.toBytes("fam1");
2274    byte[] fam2 = Bytes.toBytes("fam2");
2275    byte[] fam3 = Bytes.toBytes("fam3");
2276    byte[] fam4 = Bytes.toBytes("fam4");
2277
2278    // Setting up region
2279    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2280    try {
2281      List<Cell> kvs = new ArrayList<>();
2282      kvs.add(new KeyValue(row1, fam4, null, null));
2283
2284      // testing existing family
2285      byte[] family = fam2;
2286      try {
2287        NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2288        deleteMap.put(family, kvs);
2289        region.delete(deleteMap, Durability.SYNC_WAL);
2290      } catch (Exception e) {
2291        fail("Family " + new String(family, StandardCharsets.UTF_8) + " does not exist");
2292      }
2293
2294      // testing non existing family
2295      boolean ok = false;
2296      family = fam4;
2297      try {
2298        NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2299        deleteMap.put(family, kvs);
2300        region.delete(deleteMap, Durability.SYNC_WAL);
2301      } catch (Exception e) {
2302        ok = true;
2303      }
2304      assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2305    } finally {
2306      HBaseTestingUtility.closeRegionAndWAL(this.region);
2307      this.region = null;
2308    }
2309  }
2310
2311  @Test
2312  public void testDelete_mixed() throws IOException, InterruptedException {
2313    byte[] fam = Bytes.toBytes("info");
2314    byte[][] families = { fam };
2315    this.region = initHRegion(tableName, method, CONF, families);
2316    try {
2317      EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2318
2319      byte[] row = Bytes.toBytes("table_name");
2320      // column names
2321      byte[] serverinfo = Bytes.toBytes("serverinfo");
2322      byte[] splitA = Bytes.toBytes("splitA");
2323      byte[] splitB = Bytes.toBytes("splitB");
2324
2325      // add some data:
2326      Put put = new Put(row);
2327      put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2328      region.put(put);
2329
2330      put = new Put(row);
2331      put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2332      region.put(put);
2333
2334      put = new Put(row);
2335      put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2336      region.put(put);
2337
2338      // ok now delete a split:
2339      Delete delete = new Delete(row);
2340      delete.addColumns(fam, splitA);
2341      region.delete(delete);
2342
2343      // assert some things:
2344      Get get = new Get(row).addColumn(fam, serverinfo);
2345      Result result = region.get(get);
2346      assertEquals(1, result.size());
2347
2348      get = new Get(row).addColumn(fam, splitA);
2349      result = region.get(get);
2350      assertEquals(0, result.size());
2351
2352      get = new Get(row).addColumn(fam, splitB);
2353      result = region.get(get);
2354      assertEquals(1, result.size());
2355
2356      // Assert that after a delete, I can put.
2357      put = new Put(row);
2358      put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2359      region.put(put);
2360      get = new Get(row);
2361      result = region.get(get);
2362      assertEquals(3, result.size());
2363
2364      // Now delete all... then test I can add stuff back
2365      delete = new Delete(row);
2366      region.delete(delete);
2367      assertEquals(0, region.get(get).size());
2368
2369      region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2370      result = region.get(get);
2371      assertEquals(1, result.size());
2372    } finally {
2373      HBaseTestingUtility.closeRegionAndWAL(this.region);
2374      this.region = null;
2375    }
2376  }
2377
2378  @Test
2379  public void testDeleteRowWithFutureTs() throws IOException {
2380    byte[] fam = Bytes.toBytes("info");
2381    byte[][] families = { fam };
2382    this.region = initHRegion(tableName, method, CONF, families);
2383    try {
2384      byte[] row = Bytes.toBytes("table_name");
2385      // column names
2386      byte[] serverinfo = Bytes.toBytes("serverinfo");
2387
2388      // add data in the far future
2389      Put put = new Put(row);
2390      put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2391      region.put(put);
2392
2393      // now delete something in the present
2394      Delete delete = new Delete(row);
2395      region.delete(delete);
2396
2397      // make sure we still see our data
2398      Get get = new Get(row).addColumn(fam, serverinfo);
2399      Result result = region.get(get);
2400      assertEquals(1, result.size());
2401
2402      // delete the future row
2403      delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2404      region.delete(delete);
2405
2406      // make sure it is gone
2407      get = new Get(row).addColumn(fam, serverinfo);
2408      result = region.get(get);
2409      assertEquals(0, result.size());
2410    } finally {
2411      HBaseTestingUtility.closeRegionAndWAL(this.region);
2412      this.region = null;
2413    }
2414  }
2415
2416  /**
2417   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2418   * the actual timestamp
2419   */
2420  @Test
2421  public void testPutWithLatestTS() throws IOException {
2422    byte[] fam = Bytes.toBytes("info");
2423    byte[][] families = { fam };
2424    this.region = initHRegion(tableName, method, CONF, families);
2425    try {
2426      byte[] row = Bytes.toBytes("row1");
2427      // column names
2428      byte[] qual = Bytes.toBytes("qual");
2429
2430      // add data with LATEST_TIMESTAMP, put without WAL
2431      Put put = new Put(row);
2432      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2433      region.put(put);
2434
2435      // Make sure it shows up with an actual timestamp
2436      Get get = new Get(row).addColumn(fam, qual);
2437      Result result = region.get(get);
2438      assertEquals(1, result.size());
2439      Cell kv = result.rawCells()[0];
2440      LOG.info("Got: " + kv);
2441      assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2442          kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2443
2444      // Check same with WAL enabled (historically these took different
2445      // code paths, so check both)
2446      row = Bytes.toBytes("row2");
2447      put = new Put(row);
2448      put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2449      region.put(put);
2450
2451      // Make sure it shows up with an actual timestamp
2452      get = new Get(row).addColumn(fam, qual);
2453      result = region.get(get);
2454      assertEquals(1, result.size());
2455      kv = result.rawCells()[0];
2456      LOG.info("Got: " + kv);
2457      assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2458          kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2459    } finally {
2460      HBaseTestingUtility.closeRegionAndWAL(this.region);
2461      this.region = null;
2462    }
2463
2464  }
2465
2466  /**
2467   * Tests that there is server-side filtering for invalid timestamp upper
2468   * bound. Note that the timestamp lower bound is automatically handled for us
2469   * by the TTL field.
2470   */
2471  @Test
2472  public void testPutWithTsSlop() throws IOException {
2473    byte[] fam = Bytes.toBytes("info");
2474    byte[][] families = { fam };
2475
2476    // add data with a timestamp that is too recent for range. Ensure assert
2477    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2478    this.region = initHRegion(tableName, method, CONF, families);
2479    boolean caughtExcep = false;
2480    try {
2481      try {
2482        // no TS specified == use latest. should not error
2483        region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2484        // TS out of range. should error
2485        region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2486            System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2487        fail("Expected IOE for TS out of configured timerange");
2488      } catch (FailedSanityCheckException ioe) {
2489        LOG.debug("Received expected exception", ioe);
2490        caughtExcep = true;
2491      }
2492      assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2493    } finally {
2494      HBaseTestingUtility.closeRegionAndWAL(this.region);
2495      this.region = null;
2496    }
2497  }
2498
2499  @Test
2500  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2501    byte[] fam1 = Bytes.toBytes("columnA");
2502    byte[] fam2 = Bytes.toBytes("columnB");
2503    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2504    try {
2505      byte[] rowA = Bytes.toBytes("rowA");
2506      byte[] rowB = Bytes.toBytes("rowB");
2507
2508      byte[] value = Bytes.toBytes("value");
2509
2510      Delete delete = new Delete(rowA);
2511      delete.addFamily(fam1);
2512
2513      region.delete(delete);
2514
2515      // now create data.
2516      Put put = new Put(rowA);
2517      put.addColumn(fam2, null, value);
2518      region.put(put);
2519
2520      put = new Put(rowB);
2521      put.addColumn(fam1, null, value);
2522      put.addColumn(fam2, null, value);
2523      region.put(put);
2524
2525      Scan scan = new Scan();
2526      scan.addFamily(fam1).addFamily(fam2);
2527      InternalScanner s = region.getScanner(scan);
2528      List<Cell> results = new ArrayList<>();
2529      s.next(results);
2530      assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2531
2532      results.clear();
2533      s.next(results);
2534      assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2535    } finally {
2536      HBaseTestingUtility.closeRegionAndWAL(this.region);
2537      this.region = null;
2538    }
2539  }
2540
2541  @Test
2542  public void testDataInMemoryWithoutWAL() throws IOException {
2543    FileSystem fs = FileSystem.get(CONF);
2544    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2545    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2546    hLog.init();
2547    // This chunk creation is done throughout the code base. Do we want to move it into core?
2548    // It is missing from this test. W/o it we NPE.
2549    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2550        COLUMN_FAMILY_BYTES);
2551
2552    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2553      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2554    final long originalSize = KeyValueUtil.length(originalCell);
2555
2556    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2557      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2558    final long addSize = KeyValueUtil.length(addCell);
2559
2560    LOG.info("originalSize:" + originalSize
2561      + ", addSize:" + addSize);
2562    // start test. We expect that the addPut's durability will be replaced
2563    // by originalPut's durability.
2564
2565    // case 1:
2566    testDataInMemoryWithoutWAL(region,
2567            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2568            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2569            originalSize + addSize);
2570
2571    // case 2:
2572    testDataInMemoryWithoutWAL(region,
2573            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2574            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2575            originalSize + addSize);
2576
2577    // case 3:
2578    testDataInMemoryWithoutWAL(region,
2579            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2580            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2581            0);
2582
2583    // case 4:
2584    testDataInMemoryWithoutWAL(region,
2585            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2586            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2587            0);
2588  }
2589
2590  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2591          final Put addPut, long delta) throws IOException {
2592    final long initSize = region.getDataInMemoryWithoutWAL();
2593    // save normalCPHost and replaced by mockedCPHost
2594    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2595    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2596    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2597    // do below format (from Mockito doc).
2598    Mockito.doAnswer(new Answer() {
2599      @Override
2600      public Object answer(InvocationOnMock invocation) throws Throwable {
2601        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2602        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2603        return null;
2604      }
2605    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2606    region.setCoprocessorHost(mockedCPHost);
2607    region.put(originalPut);
2608    region.setCoprocessorHost(normalCPHost);
2609    final long finalSize = region.getDataInMemoryWithoutWAL();
2610    assertEquals("finalSize:" + finalSize + ", initSize:"
2611      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2612  }
2613
2614  @Test
2615  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2616    Delete delete = new Delete(row);
2617    delete.addColumns(fam1, qual1);
2618    doTestDelete_AndPostInsert(delete);
2619  }
2620
2621  @Test
2622  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2623    Delete delete = new Delete(row);
2624    delete.addFamily(fam1);
2625    doTestDelete_AndPostInsert(delete);
2626  }
2627
2628  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2629    this.region = initHRegion(tableName, method, CONF, fam1);
2630    try {
2631      EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2632      Put put = new Put(row);
2633      put.addColumn(fam1, qual1, value1);
2634      region.put(put);
2635
2636      // now delete the value:
2637      region.delete(delete);
2638
2639      // ok put data:
2640      put = new Put(row);
2641      put.addColumn(fam1, qual1, value2);
2642      region.put(put);
2643
2644      // ok get:
2645      Get get = new Get(row);
2646      get.addColumn(fam1, qual1);
2647
2648      Result r = region.get(get);
2649      assertEquals(1, r.size());
2650      assertArrayEquals(value2, r.getValue(fam1, qual1));
2651
2652      // next:
2653      Scan scan = new Scan(row);
2654      scan.addColumn(fam1, qual1);
2655      InternalScanner s = region.getScanner(scan);
2656
2657      List<Cell> results = new ArrayList<>();
2658      assertEquals(false, s.next(results));
2659      assertEquals(1, results.size());
2660      Cell kv = results.get(0);
2661
2662      assertArrayEquals(value2, CellUtil.cloneValue(kv));
2663      assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2664      assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2665      assertArrayEquals(row, CellUtil.cloneRow(kv));
2666    } finally {
2667      HBaseTestingUtility.closeRegionAndWAL(this.region);
2668      this.region = null;
2669    }
2670  }
2671
2672  @Test
2673  public void testDelete_CheckTimestampUpdated() throws IOException {
2674    byte[] row1 = Bytes.toBytes("row1");
2675    byte[] col1 = Bytes.toBytes("col1");
2676    byte[] col2 = Bytes.toBytes("col2");
2677    byte[] col3 = Bytes.toBytes("col3");
2678
2679    // Setting up region
2680    this.region = initHRegion(tableName, method, CONF, fam1);
2681    try {
2682      // Building checkerList
2683      List<Cell> kvs = new ArrayList<>();
2684      kvs.add(new KeyValue(row1, fam1, col1, null));
2685      kvs.add(new KeyValue(row1, fam1, col2, null));
2686      kvs.add(new KeyValue(row1, fam1, col3, null));
2687
2688      NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2689      deleteMap.put(fam1, kvs);
2690      region.delete(deleteMap, Durability.SYNC_WAL);
2691
2692      // extract the key values out the memstore:
2693      // This is kinda hacky, but better than nothing...
2694      long now = System.currentTimeMillis();
2695      AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2696      Cell firstCell = memstore.getActive().first();
2697      assertTrue(firstCell.getTimestamp() <= now);
2698      now = firstCell.getTimestamp();
2699      for (Cell cell : memstore.getActive().getCellSet()) {
2700        assertTrue(cell.getTimestamp() <= now);
2701        now = cell.getTimestamp();
2702      }
2703    } finally {
2704      HBaseTestingUtility.closeRegionAndWAL(this.region);
2705      this.region = null;
2706    }
2707  }
2708
2709  // ////////////////////////////////////////////////////////////////////////////
2710  // Get tests
2711  // ////////////////////////////////////////////////////////////////////////////
2712  @Test
2713  public void testGet_FamilyChecker() throws IOException {
2714    byte[] row1 = Bytes.toBytes("row1");
2715    byte[] fam1 = Bytes.toBytes("fam1");
2716    byte[] fam2 = Bytes.toBytes("False");
2717    byte[] col1 = Bytes.toBytes("col1");
2718
2719    // Setting up region
2720    this.region = initHRegion(tableName, method, CONF, fam1);
2721    try {
2722      Get get = new Get(row1);
2723      get.addColumn(fam2, col1);
2724
2725      // Test
2726      try {
2727        region.get(get);
2728      } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2729        assertFalse(false);
2730        return;
2731      }
2732      assertFalse(true);
2733    } finally {
2734      HBaseTestingUtility.closeRegionAndWAL(this.region);
2735      this.region = null;
2736    }
2737  }
2738
2739  @Test
2740  public void testGet_Basic() throws IOException {
2741    byte[] row1 = Bytes.toBytes("row1");
2742    byte[] fam1 = Bytes.toBytes("fam1");
2743    byte[] col1 = Bytes.toBytes("col1");
2744    byte[] col2 = Bytes.toBytes("col2");
2745    byte[] col3 = Bytes.toBytes("col3");
2746    byte[] col4 = Bytes.toBytes("col4");
2747    byte[] col5 = Bytes.toBytes("col5");
2748
2749    // Setting up region
2750    this.region = initHRegion(tableName, method, CONF, fam1);
2751    try {
2752      // Add to memstore
2753      Put put = new Put(row1);
2754      put.addColumn(fam1, col1, null);
2755      put.addColumn(fam1, col2, null);
2756      put.addColumn(fam1, col3, null);
2757      put.addColumn(fam1, col4, null);
2758      put.addColumn(fam1, col5, null);
2759      region.put(put);
2760
2761      Get get = new Get(row1);
2762      get.addColumn(fam1, col2);
2763      get.addColumn(fam1, col4);
2764      // Expected result
2765      KeyValue kv1 = new KeyValue(row1, fam1, col2);
2766      KeyValue kv2 = new KeyValue(row1, fam1, col4);
2767      KeyValue[] expected = { kv1, kv2 };
2768
2769      // Test
2770      Result res = region.get(get);
2771      assertEquals(expected.length, res.size());
2772      for (int i = 0; i < res.size(); i++) {
2773        assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2774        assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2775        assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2776      }
2777
2778      // Test using a filter on a Get
2779      Get g = new Get(row1);
2780      final int count = 2;
2781      g.setFilter(new ColumnCountGetFilter(count));
2782      res = region.get(g);
2783      assertEquals(count, res.size());
2784    } finally {
2785      HBaseTestingUtility.closeRegionAndWAL(this.region);
2786      this.region = null;
2787    }
2788  }
2789
2790  @Test
2791  public void testGet_Empty() throws IOException {
2792    byte[] row = Bytes.toBytes("row");
2793    byte[] fam = Bytes.toBytes("fam");
2794
2795    this.region = initHRegion(tableName, method, CONF, fam);
2796    try {
2797      Get get = new Get(row);
2798      get.addFamily(fam);
2799      Result r = region.get(get);
2800
2801      assertTrue(r.isEmpty());
2802    } finally {
2803      HBaseTestingUtility.closeRegionAndWAL(this.region);
2804      this.region = null;
2805    }
2806  }
2807
2808  @Test
2809  public void testGetWithFilter() throws IOException, InterruptedException {
2810    byte[] row1 = Bytes.toBytes("row1");
2811    byte[] fam1 = Bytes.toBytes("fam1");
2812    byte[] col1 = Bytes.toBytes("col1");
2813    byte[] value1 = Bytes.toBytes("value1");
2814    byte[] value2 = Bytes.toBytes("value2");
2815
2816    final int maxVersions = 3;
2817    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2818    hcd.setMaxVersions(maxVersions);
2819    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2820    htd.addFamily(hcd);
2821    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2822    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2823    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2824    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2825    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2826
2827    try {
2828      // Put 4 version to memstore
2829      long ts = 0;
2830      Put put = new Put(row1, ts);
2831      put.addColumn(fam1, col1, value1);
2832      region.put(put);
2833      put = new Put(row1, ts + 1);
2834      put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2835      region.put(put);
2836      put = new Put(row1, ts + 2);
2837      put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2838      region.put(put);
2839      put = new Put(row1, ts + 3);
2840      put.addColumn(fam1, col1, value2);
2841      region.put(put);
2842
2843      Get get = new Get(row1);
2844      get.setMaxVersions();
2845      Result res = region.get(get);
2846      // Get 3 versions, the oldest version has gone from user view
2847      assertEquals(maxVersions, res.size());
2848
2849      get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
2850      res = region.get(get);
2851      // When use value filter, the oldest version should still gone from user view and it
2852      // should only return one key vaule
2853      assertEquals(1, res.size());
2854      assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2855      assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2856
2857      region.flush(true);
2858      region.compact(true);
2859      Thread.sleep(1000);
2860      res = region.get(get);
2861      // After flush and compact, the result should be consistent with previous result
2862      assertEquals(1, res.size());
2863      assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2864    } finally {
2865      HBaseTestingUtility.closeRegionAndWAL(this.region);
2866      this.region = null;
2867    }
2868  }
2869
2870  // ////////////////////////////////////////////////////////////////////////////
2871  // Scanner tests
2872  // ////////////////////////////////////////////////////////////////////////////
2873  @Test
2874  public void testGetScanner_WithOkFamilies() throws IOException {
2875    byte[] fam1 = Bytes.toBytes("fam1");
2876    byte[] fam2 = Bytes.toBytes("fam2");
2877
2878    byte[][] families = { fam1, fam2 };
2879
2880    // Setting up region
2881    this.region = initHRegion(tableName, method, CONF, families);
2882    try {
2883      Scan scan = new Scan();
2884      scan.addFamily(fam1);
2885      scan.addFamily(fam2);
2886      try {
2887        region.getScanner(scan);
2888      } catch (Exception e) {
2889        assertTrue("Families could not be found in Region", false);
2890      }
2891    } finally {
2892      HBaseTestingUtility.closeRegionAndWAL(this.region);
2893      this.region = null;
2894    }
2895  }
2896
2897  @Test
2898  public void testGetScanner_WithNotOkFamilies() throws IOException {
2899    byte[] fam1 = Bytes.toBytes("fam1");
2900    byte[] fam2 = Bytes.toBytes("fam2");
2901
2902    byte[][] families = { fam1 };
2903
2904    // Setting up region
2905    this.region = initHRegion(tableName, method, CONF, families);
2906    try {
2907      Scan scan = new Scan();
2908      scan.addFamily(fam2);
2909      boolean ok = false;
2910      try {
2911        region.getScanner(scan);
2912      } catch (Exception e) {
2913        ok = true;
2914      }
2915      assertTrue("Families could not be found in Region", ok);
2916    } finally {
2917      HBaseTestingUtility.closeRegionAndWAL(this.region);
2918      this.region = null;
2919    }
2920  }
2921
2922  @Test
2923  public void testGetScanner_WithNoFamilies() throws IOException {
2924    byte[] row1 = Bytes.toBytes("row1");
2925    byte[] fam1 = Bytes.toBytes("fam1");
2926    byte[] fam2 = Bytes.toBytes("fam2");
2927    byte[] fam3 = Bytes.toBytes("fam3");
2928    byte[] fam4 = Bytes.toBytes("fam4");
2929
2930    byte[][] families = { fam1, fam2, fam3, fam4 };
2931
2932    // Setting up region
2933    this.region = initHRegion(tableName, method, CONF, families);
2934    try {
2935
2936      // Putting data in Region
2937      Put put = new Put(row1);
2938      put.addColumn(fam1, null, null);
2939      put.addColumn(fam2, null, null);
2940      put.addColumn(fam3, null, null);
2941      put.addColumn(fam4, null, null);
2942      region.put(put);
2943
2944      Scan scan = null;
2945      HRegion.RegionScannerImpl is = null;
2946
2947      // Testing to see how many scanners that is produced by getScanner,
2948      // starting
2949      // with known number, 2 - current = 1
2950      scan = new Scan();
2951      scan.addFamily(fam2);
2952      scan.addFamily(fam4);
2953      is = region.getScanner(scan);
2954      assertEquals(1, is.storeHeap.getHeap().size());
2955
2956      scan = new Scan();
2957      is = region.getScanner(scan);
2958      assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2959    } finally {
2960      HBaseTestingUtility.closeRegionAndWAL(this.region);
2961      this.region = null;
2962    }
2963  }
2964
2965  /**
2966   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2967   *
2968   * @throws IOException
2969   */
2970  @Test
2971  public void testGetScanner_WithRegionClosed() throws IOException {
2972    byte[] fam1 = Bytes.toBytes("fam1");
2973    byte[] fam2 = Bytes.toBytes("fam2");
2974
2975    byte[][] families = { fam1, fam2 };
2976
2977    // Setting up region
2978    try {
2979      this.region = initHRegion(tableName, method, CONF, families);
2980    } catch (IOException e) {
2981      e.printStackTrace();
2982      fail("Got IOException during initHRegion, " + e.getMessage());
2983    }
2984    try {
2985      region.closed.set(true);
2986      try {
2987        region.getScanner(null);
2988        fail("Expected to get an exception during getScanner on a region that is closed");
2989      } catch (NotServingRegionException e) {
2990        // this is the correct exception that is expected
2991      } catch (IOException e) {
2992        fail("Got wrong type of exception - should be a NotServingRegionException, " +
2993            "but was an IOException: "
2994            + e.getMessage());
2995      }
2996    } finally {
2997      HBaseTestingUtility.closeRegionAndWAL(this.region);
2998      this.region = null;
2999    }
3000  }
3001
3002  @Test
3003  public void testRegionScanner_Next() throws IOException {
3004    byte[] row1 = Bytes.toBytes("row1");
3005    byte[] row2 = Bytes.toBytes("row2");
3006    byte[] fam1 = Bytes.toBytes("fam1");
3007    byte[] fam2 = Bytes.toBytes("fam2");
3008    byte[] fam3 = Bytes.toBytes("fam3");
3009    byte[] fam4 = Bytes.toBytes("fam4");
3010
3011    byte[][] families = { fam1, fam2, fam3, fam4 };
3012    long ts = System.currentTimeMillis();
3013
3014    // Setting up region
3015    this.region = initHRegion(tableName, method, CONF, families);
3016    try {
3017      // Putting data in Region
3018      Put put = null;
3019      put = new Put(row1);
3020      put.addColumn(fam1, (byte[]) null, ts, null);
3021      put.addColumn(fam2, (byte[]) null, ts, null);
3022      put.addColumn(fam3, (byte[]) null, ts, null);
3023      put.addColumn(fam4, (byte[]) null, ts, null);
3024      region.put(put);
3025
3026      put = new Put(row2);
3027      put.addColumn(fam1, (byte[]) null, ts, null);
3028      put.addColumn(fam2, (byte[]) null, ts, null);
3029      put.addColumn(fam3, (byte[]) null, ts, null);
3030      put.addColumn(fam4, (byte[]) null, ts, null);
3031      region.put(put);
3032
3033      Scan scan = new Scan();
3034      scan.addFamily(fam2);
3035      scan.addFamily(fam4);
3036      InternalScanner is = region.getScanner(scan);
3037
3038      List<Cell> res = null;
3039
3040      // Result 1
3041      List<Cell> expected1 = new ArrayList<>();
3042      expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
3043      expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
3044
3045      res = new ArrayList<>();
3046      is.next(res);
3047      for (int i = 0; i < res.size(); i++) {
3048        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
3049      }
3050
3051      // Result 2
3052      List<Cell> expected2 = new ArrayList<>();
3053      expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
3054      expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
3055
3056      res = new ArrayList<>();
3057      is.next(res);
3058      for (int i = 0; i < res.size(); i++) {
3059        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
3060      }
3061    } finally {
3062      HBaseTestingUtility.closeRegionAndWAL(this.region);
3063      this.region = null;
3064    }
3065  }
3066
3067  @Test
3068  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
3069    byte[] row1 = Bytes.toBytes("row1");
3070    byte[] qf1 = Bytes.toBytes("qualifier1");
3071    byte[] qf2 = Bytes.toBytes("qualifier2");
3072    byte[] fam1 = Bytes.toBytes("fam1");
3073    byte[][] families = { fam1 };
3074
3075    long ts1 = System.currentTimeMillis();
3076    long ts2 = ts1 + 1;
3077    long ts3 = ts1 + 2;
3078
3079    // Setting up region
3080    this.region = initHRegion(tableName, method, CONF, families);
3081    try {
3082      // Putting data in Region
3083      Put put = null;
3084      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3085      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3086      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3087
3088      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3089      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3090      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3091
3092      put = new Put(row1);
3093      put.add(kv13);
3094      put.add(kv12);
3095      put.add(kv11);
3096      put.add(kv23);
3097      put.add(kv22);
3098      put.add(kv21);
3099      region.put(put);
3100
3101      // Expected
3102      List<Cell> expected = new ArrayList<>();
3103      expected.add(kv13);
3104      expected.add(kv12);
3105
3106      Scan scan = new Scan(row1);
3107      scan.addColumn(fam1, qf1);
3108      scan.setMaxVersions(MAX_VERSIONS);
3109      List<Cell> actual = new ArrayList<>();
3110      InternalScanner scanner = region.getScanner(scan);
3111
3112      boolean hasNext = scanner.next(actual);
3113      assertEquals(false, hasNext);
3114
3115      // Verify result
3116      for (int i = 0; i < expected.size(); i++) {
3117        assertEquals(expected.get(i), actual.get(i));
3118      }
3119    } finally {
3120      HBaseTestingUtility.closeRegionAndWAL(this.region);
3121      this.region = null;
3122    }
3123  }
3124
3125  @Test
3126  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
3127    byte[] row1 = Bytes.toBytes("row1");
3128    byte[] qf1 = Bytes.toBytes("qualifier1");
3129    byte[] qf2 = Bytes.toBytes("qualifier2");
3130    byte[] fam1 = Bytes.toBytes("fam1");
3131    byte[][] families = { fam1 };
3132
3133    long ts1 = 1; // System.currentTimeMillis();
3134    long ts2 = ts1 + 1;
3135    long ts3 = ts1 + 2;
3136
3137    // Setting up region
3138    this.region = initHRegion(tableName, method, CONF, families);
3139    try {
3140      // Putting data in Region
3141      Put put = null;
3142      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3143      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3144      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3145
3146      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3147      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3148      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3149
3150      put = new Put(row1);
3151      put.add(kv13);
3152      put.add(kv12);
3153      put.add(kv11);
3154      put.add(kv23);
3155      put.add(kv22);
3156      put.add(kv21);
3157      region.put(put);
3158      region.flush(true);
3159
3160      // Expected
3161      List<Cell> expected = new ArrayList<>();
3162      expected.add(kv13);
3163      expected.add(kv12);
3164      expected.add(kv23);
3165      expected.add(kv22);
3166
3167      Scan scan = new Scan(row1);
3168      scan.addColumn(fam1, qf1);
3169      scan.addColumn(fam1, qf2);
3170      scan.setMaxVersions(MAX_VERSIONS);
3171      List<Cell> actual = new ArrayList<>();
3172      InternalScanner scanner = region.getScanner(scan);
3173
3174      boolean hasNext = scanner.next(actual);
3175      assertEquals(false, hasNext);
3176
3177      // Verify result
3178      for (int i = 0; i < expected.size(); i++) {
3179        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3180      }
3181    } finally {
3182      HBaseTestingUtility.closeRegionAndWAL(this.region);
3183      this.region = null;
3184    }
3185  }
3186
3187  @Test
3188  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3189      IOException {
3190    byte[] row1 = Bytes.toBytes("row1");
3191    byte[] fam1 = Bytes.toBytes("fam1");
3192    byte[][] families = { fam1 };
3193    byte[] qf1 = Bytes.toBytes("qualifier1");
3194    byte[] qf2 = Bytes.toBytes("qualifier2");
3195
3196    long ts1 = 1;
3197    long ts2 = ts1 + 1;
3198    long ts3 = ts1 + 2;
3199    long ts4 = ts1 + 3;
3200
3201    // Setting up region
3202    this.region = initHRegion(tableName, method, CONF, families);
3203    try {
3204      // Putting data in Region
3205      KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3206      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3207      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3208      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3209
3210      KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3211      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3212      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3213      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3214
3215      Put put = null;
3216      put = new Put(row1);
3217      put.add(kv14);
3218      put.add(kv24);
3219      region.put(put);
3220      region.flush(true);
3221
3222      put = new Put(row1);
3223      put.add(kv23);
3224      put.add(kv13);
3225      region.put(put);
3226      region.flush(true);
3227
3228      put = new Put(row1);
3229      put.add(kv22);
3230      put.add(kv12);
3231      region.put(put);
3232      region.flush(true);
3233
3234      put = new Put(row1);
3235      put.add(kv21);
3236      put.add(kv11);
3237      region.put(put);
3238
3239      // Expected
3240      List<Cell> expected = new ArrayList<>();
3241      expected.add(kv14);
3242      expected.add(kv13);
3243      expected.add(kv12);
3244      expected.add(kv24);
3245      expected.add(kv23);
3246      expected.add(kv22);
3247
3248      Scan scan = new Scan(row1);
3249      scan.addColumn(fam1, qf1);
3250      scan.addColumn(fam1, qf2);
3251      int versions = 3;
3252      scan.setMaxVersions(versions);
3253      List<Cell> actual = new ArrayList<>();
3254      InternalScanner scanner = region.getScanner(scan);
3255
3256      boolean hasNext = scanner.next(actual);
3257      assertEquals(false, hasNext);
3258
3259      // Verify result
3260      for (int i = 0; i < expected.size(); i++) {
3261        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3262      }
3263    } finally {
3264      HBaseTestingUtility.closeRegionAndWAL(this.region);
3265      this.region = null;
3266    }
3267  }
3268
3269  @Test
3270  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3271    byte[] row1 = Bytes.toBytes("row1");
3272    byte[] qf1 = Bytes.toBytes("qualifier1");
3273    byte[] qf2 = Bytes.toBytes("qualifier2");
3274    byte[] fam1 = Bytes.toBytes("fam1");
3275    byte[][] families = { fam1 };
3276
3277    long ts1 = System.currentTimeMillis();
3278    long ts2 = ts1 + 1;
3279    long ts3 = ts1 + 2;
3280
3281    // Setting up region
3282    this.region = initHRegion(tableName, method, CONF, families);
3283    try {
3284      // Putting data in Region
3285      Put put = null;
3286      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3287      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3288      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3289
3290      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3291      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3292      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3293
3294      put = new Put(row1);
3295      put.add(kv13);
3296      put.add(kv12);
3297      put.add(kv11);
3298      put.add(kv23);
3299      put.add(kv22);
3300      put.add(kv21);
3301      region.put(put);
3302
3303      // Expected
3304      List<Cell> expected = new ArrayList<>();
3305      expected.add(kv13);
3306      expected.add(kv12);
3307      expected.add(kv23);
3308      expected.add(kv22);
3309
3310      Scan scan = new Scan(row1);
3311      scan.addFamily(fam1);
3312      scan.setMaxVersions(MAX_VERSIONS);
3313      List<Cell> actual = new ArrayList<>();
3314      InternalScanner scanner = region.getScanner(scan);
3315
3316      boolean hasNext = scanner.next(actual);
3317      assertEquals(false, hasNext);
3318
3319      // Verify result
3320      for (int i = 0; i < expected.size(); i++) {
3321        assertEquals(expected.get(i), actual.get(i));
3322      }
3323    } finally {
3324      HBaseTestingUtility.closeRegionAndWAL(this.region);
3325      this.region = null;
3326    }
3327  }
3328
3329  @Test
3330  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3331    byte[] row1 = Bytes.toBytes("row1");
3332    byte[] qf1 = Bytes.toBytes("qualifier1");
3333    byte[] qf2 = Bytes.toBytes("qualifier2");
3334    byte[] fam1 = Bytes.toBytes("fam1");
3335
3336    long ts1 = 1; // System.currentTimeMillis();
3337    long ts2 = ts1 + 1;
3338    long ts3 = ts1 + 2;
3339
3340    // Setting up region
3341    this.region = initHRegion(tableName, method, CONF, fam1);
3342    try {
3343      // Putting data in Region
3344      Put put = null;
3345      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3346      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3347      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3348
3349      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3350      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3351      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3352
3353      put = new Put(row1);
3354      put.add(kv13);
3355      put.add(kv12);
3356      put.add(kv11);
3357      put.add(kv23);
3358      put.add(kv22);
3359      put.add(kv21);
3360      region.put(put);
3361      region.flush(true);
3362
3363      // Expected
3364      List<Cell> expected = new ArrayList<>();
3365      expected.add(kv13);
3366      expected.add(kv12);
3367      expected.add(kv23);
3368      expected.add(kv22);
3369
3370      Scan scan = new Scan(row1);
3371      scan.addFamily(fam1);
3372      scan.setMaxVersions(MAX_VERSIONS);
3373      List<Cell> actual = new ArrayList<>();
3374      InternalScanner scanner = region.getScanner(scan);
3375
3376      boolean hasNext = scanner.next(actual);
3377      assertEquals(false, hasNext);
3378
3379      // Verify result
3380      for (int i = 0; i < expected.size(); i++) {
3381        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3382      }
3383    } finally {
3384      HBaseTestingUtility.closeRegionAndWAL(this.region);
3385      this.region = null;
3386    }
3387  }
3388
3389  @Test
3390  public void testScanner_StopRow1542() throws IOException {
3391    byte[] family = Bytes.toBytes("testFamily");
3392    this.region = initHRegion(tableName, method, CONF, family);
3393    try {
3394      byte[] row1 = Bytes.toBytes("row111");
3395      byte[] row2 = Bytes.toBytes("row222");
3396      byte[] row3 = Bytes.toBytes("row333");
3397      byte[] row4 = Bytes.toBytes("row444");
3398      byte[] row5 = Bytes.toBytes("row555");
3399
3400      byte[] col1 = Bytes.toBytes("Pub111");
3401      byte[] col2 = Bytes.toBytes("Pub222");
3402
3403      Put put = new Put(row1);
3404      put.addColumn(family, col1, Bytes.toBytes(10L));
3405      region.put(put);
3406
3407      put = new Put(row2);
3408      put.addColumn(family, col1, Bytes.toBytes(15L));
3409      region.put(put);
3410
3411      put = new Put(row3);
3412      put.addColumn(family, col2, Bytes.toBytes(20L));
3413      region.put(put);
3414
3415      put = new Put(row4);
3416      put.addColumn(family, col2, Bytes.toBytes(30L));
3417      region.put(put);
3418
3419      put = new Put(row5);
3420      put.addColumn(family, col1, Bytes.toBytes(40L));
3421      region.put(put);
3422
3423      Scan scan = new Scan(row3, row4);
3424      scan.setMaxVersions();
3425      scan.addColumn(family, col1);
3426      InternalScanner s = region.getScanner(scan);
3427
3428      List<Cell> results = new ArrayList<>();
3429      assertEquals(false, s.next(results));
3430      assertEquals(0, results.size());
3431    } finally {
3432      HBaseTestingUtility.closeRegionAndWAL(this.region);
3433      this.region = null;
3434    }
3435  }
3436
3437  @Test
3438  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3439    byte[] row1 = Bytes.toBytes("row1");
3440    byte[] fam1 = Bytes.toBytes("fam1");
3441    byte[] qf1 = Bytes.toBytes("qualifier1");
3442    byte[] qf2 = Bytes.toBytes("quateslifier2");
3443
3444    long ts1 = 1;
3445    long ts2 = ts1 + 1;
3446    long ts3 = ts1 + 2;
3447    long ts4 = ts1 + 3;
3448
3449    // Setting up region
3450    this.region = initHRegion(tableName, method, CONF, fam1);
3451    try {
3452      // Putting data in Region
3453      KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3454      KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3455      KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3456      KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3457
3458      KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3459      KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3460      KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3461      KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3462
3463      Put put = null;
3464      put = new Put(row1);
3465      put.add(kv14);
3466      put.add(kv24);
3467      region.put(put);
3468      region.flush(true);
3469
3470      put = new Put(row1);
3471      put.add(kv23);
3472      put.add(kv13);
3473      region.put(put);
3474      region.flush(true);
3475
3476      put = new Put(row1);
3477      put.add(kv22);
3478      put.add(kv12);
3479      region.put(put);
3480      region.flush(true);
3481
3482      put = new Put(row1);
3483      put.add(kv21);
3484      put.add(kv11);
3485      region.put(put);
3486
3487      // Expected
3488      List<KeyValue> expected = new ArrayList<>();
3489      expected.add(kv14);
3490      expected.add(kv13);
3491      expected.add(kv12);
3492      expected.add(kv24);
3493      expected.add(kv23);
3494      expected.add(kv22);
3495
3496      Scan scan = new Scan(row1);
3497      int versions = 3;
3498      scan.setMaxVersions(versions);
3499      List<Cell> actual = new ArrayList<>();
3500      InternalScanner scanner = region.getScanner(scan);
3501
3502      boolean hasNext = scanner.next(actual);
3503      assertEquals(false, hasNext);
3504
3505      // Verify result
3506      for (int i = 0; i < expected.size(); i++) {
3507        assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3508      }
3509    } finally {
3510      HBaseTestingUtility.closeRegionAndWAL(this.region);
3511      this.region = null;
3512    }
3513  }
3514
3515  /**
3516   * Added for HBASE-5416
3517   *
3518   * Here we test scan optimization when only subset of CFs are used in filter
3519   * conditions.
3520   */
3521  @Test
3522  public void testScanner_JoinedScanners() throws IOException {
3523    byte[] cf_essential = Bytes.toBytes("essential");
3524    byte[] cf_joined = Bytes.toBytes("joined");
3525    byte[] cf_alpha = Bytes.toBytes("alpha");
3526    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3527    try {
3528      byte[] row1 = Bytes.toBytes("row1");
3529      byte[] row2 = Bytes.toBytes("row2");
3530      byte[] row3 = Bytes.toBytes("row3");
3531
3532      byte[] col_normal = Bytes.toBytes("d");
3533      byte[] col_alpha = Bytes.toBytes("a");
3534
3535      byte[] filtered_val = Bytes.toBytes(3);
3536
3537      Put put = new Put(row1);
3538      put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3539      put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3540      region.put(put);
3541
3542      put = new Put(row2);
3543      put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3544      put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3545      put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3546      region.put(put);
3547
3548      put = new Put(row3);
3549      put.addColumn(cf_essential, col_normal, filtered_val);
3550      put.addColumn(cf_joined, col_normal, filtered_val);
3551      region.put(put);
3552
3553      // Check two things:
3554      // 1. result list contains expected values
3555      // 2. result list is sorted properly
3556
3557      Scan scan = new Scan();
3558      Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3559          CompareOp.NOT_EQUAL, filtered_val);
3560      scan.setFilter(filter);
3561      scan.setLoadColumnFamiliesOnDemand(true);
3562      InternalScanner s = region.getScanner(scan);
3563
3564      List<Cell> results = new ArrayList<>();
3565      assertTrue(s.next(results));
3566      assertEquals(1, results.size());
3567      results.clear();
3568
3569      assertTrue(s.next(results));
3570      assertEquals(3, results.size());
3571      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3572      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3573      assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3574      results.clear();
3575
3576      assertFalse(s.next(results));
3577      assertEquals(0, results.size());
3578    } finally {
3579      HBaseTestingUtility.closeRegionAndWAL(this.region);
3580      this.region = null;
3581    }
3582  }
3583
3584  /**
3585   * HBASE-5416
3586   *
3587   * Test case when scan limits amount of KVs returned on each next() call.
3588   */
3589  @Test
3590  public void testScanner_JoinedScannersWithLimits() throws IOException {
3591    final byte[] cf_first = Bytes.toBytes("first");
3592    final byte[] cf_second = Bytes.toBytes("second");
3593
3594    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3595    try {
3596      final byte[] col_a = Bytes.toBytes("a");
3597      final byte[] col_b = Bytes.toBytes("b");
3598
3599      Put put;
3600
3601      for (int i = 0; i < 10; i++) {
3602        put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3603        put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3604        if (i < 5) {
3605          put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3606          put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3607          put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3608        }
3609        region.put(put);
3610      }
3611
3612      Scan scan = new Scan();
3613      scan.setLoadColumnFamiliesOnDemand(true);
3614      Filter bogusFilter = new FilterBase() {
3615        @Override
3616        public ReturnCode filterCell(final Cell ignored) throws IOException {
3617          return ReturnCode.INCLUDE;
3618        }
3619        @Override
3620        public boolean isFamilyEssential(byte[] name) {
3621          return Bytes.equals(name, cf_first);
3622        }
3623      };
3624
3625      scan.setFilter(bogusFilter);
3626      InternalScanner s = region.getScanner(scan);
3627
3628      // Our data looks like this:
3629      // r0: first:a, first:b, second:a, second:b
3630      // r1: first:a, first:b, second:a, second:b
3631      // r2: first:a, first:b, second:a, second:b
3632      // r3: first:a, first:b, second:a, second:b
3633      // r4: first:a, first:b, second:a, second:b
3634      // r5: first:a
3635      // r6: first:a
3636      // r7: first:a
3637      // r8: first:a
3638      // r9: first:a
3639
3640      // But due to next's limit set to 3, we should get this:
3641      // r0: first:a, first:b, second:a
3642      // r0: second:b
3643      // r1: first:a, first:b, second:a
3644      // r1: second:b
3645      // r2: first:a, first:b, second:a
3646      // r2: second:b
3647      // r3: first:a, first:b, second:a
3648      // r3: second:b
3649      // r4: first:a, first:b, second:a
3650      // r4: second:b
3651      // r5: first:a
3652      // r6: first:a
3653      // r7: first:a
3654      // r8: first:a
3655      // r9: first:a
3656
3657      List<Cell> results = new ArrayList<>();
3658      int index = 0;
3659      ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3660      while (true) {
3661        boolean more = s.next(results, scannerContext);
3662        if ((index >> 1) < 5) {
3663          if (index % 2 == 0) {
3664            assertEquals(3, results.size());
3665          } else {
3666            assertEquals(1, results.size());
3667          }
3668        } else {
3669          assertEquals(1, results.size());
3670        }
3671        results.clear();
3672        index++;
3673        if (!more) {
3674          break;
3675        }
3676      }
3677    } finally {
3678      HBaseTestingUtility.closeRegionAndWAL(this.region);
3679      this.region = null;
3680    }
3681  }
3682
3683  /**
3684   * Write an HFile block full with Cells whose qualifier that are identical between
3685   * 0 and Short.MAX_VALUE. See HBASE-13329.
3686   * @throws Exception
3687   */
3688  @Test
3689  public void testLongQualifier() throws Exception {
3690    byte[] family = Bytes.toBytes("family");
3691    this.region = initHRegion(tableName, method, CONF, family);
3692    byte[] q = new byte[Short.MAX_VALUE+2];
3693    Arrays.fill(q, 0, q.length-1, (byte)42);
3694    for (byte i=0; i<10; i++) {
3695      Put p = new Put(Bytes.toBytes("row"));
3696      // qualifiers that differ past Short.MAX_VALUE
3697      q[q.length-1]=i;
3698      p.addColumn(family, q, q);
3699      region.put(p);
3700    }
3701    region.flush(false);
3702    HBaseTestingUtility.closeRegionAndWAL(this.region);
3703    this.region = null;
3704  }
3705
3706  /**
3707   * Flushes the cache in a thread while scanning. The tests verify that the
3708   * scan is coherent - e.g. the returned results are always of the same or
3709   * later update as the previous results.
3710   *
3711   * @throws IOException
3712   *           scan / compact
3713   * @throws InterruptedException
3714   *           thread join
3715   */
3716  @Test
3717  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3718    byte[] family = Bytes.toBytes("family");
3719    int numRows = 1000;
3720    int flushAndScanInterval = 10;
3721    int compactInterval = 10 * flushAndScanInterval;
3722
3723    this.region = initHRegion(tableName, method, CONF, family);
3724    FlushThread flushThread = new FlushThread();
3725    try {
3726      flushThread.start();
3727
3728      Scan scan = new Scan();
3729      scan.addFamily(family);
3730      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3731          new BinaryComparator(Bytes.toBytes(5L))));
3732
3733      int expectedCount = 0;
3734      List<Cell> res = new ArrayList<>();
3735
3736      boolean toggle = true;
3737      for (long i = 0; i < numRows; i++) {
3738        Put put = new Put(Bytes.toBytes(i));
3739        put.setDurability(Durability.SKIP_WAL);
3740        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3741        region.put(put);
3742
3743        if (i != 0 && i % compactInterval == 0) {
3744          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3745          region.compact(true);
3746        }
3747
3748        if (i % 10 == 5L) {
3749          expectedCount++;
3750        }
3751
3752        if (i != 0 && i % flushAndScanInterval == 0) {
3753          res.clear();
3754          InternalScanner scanner = region.getScanner(scan);
3755          if (toggle) {
3756            flushThread.flush();
3757          }
3758          while (scanner.next(res))
3759            ;
3760          if (!toggle) {
3761            flushThread.flush();
3762          }
3763          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3764              expectedCount, res.size());
3765          toggle = !toggle;
3766        }
3767      }
3768
3769    } finally {
3770      try {
3771        flushThread.done();
3772        flushThread.join();
3773        flushThread.checkNoError();
3774      } catch (InterruptedException ie) {
3775        LOG.warn("Caught exception when joining with flushThread", ie);
3776      }
3777      HBaseTestingUtility.closeRegionAndWAL(this.region);
3778      this.region = null;
3779    }
3780  }
3781
3782  protected class FlushThread extends Thread {
3783    private volatile boolean done;
3784    private Throwable error = null;
3785
3786    FlushThread() {
3787      super("FlushThread");
3788    }
3789
3790    public void done() {
3791      done = true;
3792      synchronized (this) {
3793        interrupt();
3794      }
3795    }
3796
3797    public void checkNoError() {
3798      if (error != null) {
3799        assertNull(error);
3800      }
3801    }
3802
3803    @Override
3804    public void run() {
3805      done = false;
3806      while (!done) {
3807        synchronized (this) {
3808          try {
3809            wait();
3810          } catch (InterruptedException ignored) {
3811            if (done) {
3812              break;
3813            }
3814          }
3815        }
3816        try {
3817          region.flush(true);
3818        } catch (IOException e) {
3819          if (!done) {
3820            LOG.error("Error while flushing cache", e);
3821            error = e;
3822          }
3823          break;
3824        } catch (Throwable t) {
3825          LOG.error("Uncaught exception", t);
3826          throw t;
3827        }
3828      }
3829    }
3830
3831    public void flush() {
3832      synchronized (this) {
3833        notify();
3834      }
3835    }
3836  }
3837
3838  /**
3839   * Writes very wide records and scans for the latest every time.. Flushes and
3840   * compacts the region every now and then to keep things realistic.
3841   *
3842   * @throws IOException
3843   *           by flush / scan / compaction
3844   * @throws InterruptedException
3845   *           when joining threads
3846   */
3847  @Test
3848  public void testWritesWhileScanning() throws IOException, InterruptedException {
3849    int testCount = 100;
3850    int numRows = 1;
3851    int numFamilies = 10;
3852    int numQualifiers = 100;
3853    int flushInterval = 7;
3854    int compactInterval = 5 * flushInterval;
3855    byte[][] families = new byte[numFamilies][];
3856    for (int i = 0; i < numFamilies; i++) {
3857      families[i] = Bytes.toBytes("family" + i);
3858    }
3859    byte[][] qualifiers = new byte[numQualifiers][];
3860    for (int i = 0; i < numQualifiers; i++) {
3861      qualifiers[i] = Bytes.toBytes("qual" + i);
3862    }
3863
3864    this.region = initHRegion(tableName, method, CONF, families);
3865    FlushThread flushThread = new FlushThread();
3866    PutThread putThread = new PutThread(numRows, families, qualifiers);
3867    try {
3868      putThread.start();
3869      putThread.waitForFirstPut();
3870
3871      flushThread.start();
3872
3873      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3874
3875      int expectedCount = numFamilies * numQualifiers;
3876      List<Cell> res = new ArrayList<>();
3877
3878      long prevTimestamp = 0L;
3879      for (int i = 0; i < testCount; i++) {
3880
3881        if (i != 0 && i % compactInterval == 0) {
3882          region.compact(true);
3883          for (HStore store : region.getStores()) {
3884            store.closeAndArchiveCompactedFiles();
3885          }
3886        }
3887
3888        if (i != 0 && i % flushInterval == 0) {
3889          flushThread.flush();
3890        }
3891
3892        boolean previousEmpty = res.isEmpty();
3893        res.clear();
3894        InternalScanner scanner = region.getScanner(scan);
3895        while (scanner.next(res))
3896          ;
3897        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3898          assertEquals("i=" + i, expectedCount, res.size());
3899          long timestamp = res.get(0).getTimestamp();
3900          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3901              timestamp >= prevTimestamp);
3902          prevTimestamp = timestamp;
3903        }
3904      }
3905
3906      putThread.done();
3907
3908      region.flush(true);
3909
3910    } finally {
3911      try {
3912        flushThread.done();
3913        flushThread.join();
3914        flushThread.checkNoError();
3915
3916        putThread.join();
3917        putThread.checkNoError();
3918      } catch (InterruptedException ie) {
3919        LOG.warn("Caught exception when joining with flushThread", ie);
3920      }
3921
3922      try {
3923          HBaseTestingUtility.closeRegionAndWAL(this.region);
3924      } catch (DroppedSnapshotException dse) {
3925        // We could get this on way out because we interrupt the background flusher and it could
3926        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3927        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3928        // flush because the accounting is 'off' since original DSE.
3929      }
3930      this.region = null;
3931    }
3932  }
3933
3934  protected class PutThread extends Thread {
3935    private volatile boolean done;
3936    private volatile int numPutsFinished = 0;
3937
3938    private Throwable error = null;
3939    private int numRows;
3940    private byte[][] families;
3941    private byte[][] qualifiers;
3942
3943    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3944      super("PutThread");
3945      this.numRows = numRows;
3946      this.families = families;
3947      this.qualifiers = qualifiers;
3948    }
3949
3950    /**
3951     * Block calling thread until this instance of PutThread has put at least one row.
3952     */
3953    public void waitForFirstPut() throws InterruptedException {
3954      // wait until put thread actually puts some data
3955      while (isAlive() && numPutsFinished == 0) {
3956        checkNoError();
3957        Thread.sleep(50);
3958      }
3959    }
3960
3961    public void done() {
3962      done = true;
3963      synchronized (this) {
3964        interrupt();
3965      }
3966    }
3967
3968    public void checkNoError() {
3969      if (error != null) {
3970        assertNull(error);
3971      }
3972    }
3973
3974    @Override
3975    public void run() {
3976      done = false;
3977      while (!done) {
3978        try {
3979          for (int r = 0; r < numRows; r++) {
3980            byte[] row = Bytes.toBytes("row" + r);
3981            Put put = new Put(row);
3982            put.setDurability(Durability.SKIP_WAL);
3983            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3984            for (byte[] family : families) {
3985              for (byte[] qualifier : qualifiers) {
3986                put.addColumn(family, qualifier, numPutsFinished, value);
3987              }
3988            }
3989            region.put(put);
3990            numPutsFinished++;
3991            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3992              System.out.println("put iteration = " + numPutsFinished);
3993              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3994              region.delete(delete);
3995            }
3996            numPutsFinished++;
3997          }
3998        } catch (InterruptedIOException e) {
3999          // This is fine. It means we are done, or didn't get the lock on time
4000          LOG.info("Interrupted", e);
4001        } catch (IOException e) {
4002          LOG.error("Error while putting records", e);
4003          error = e;
4004          break;
4005        }
4006      }
4007
4008    }
4009
4010  }
4011
4012  /**
4013   * Writes very wide records and gets the latest row every time.. Flushes and
4014   * compacts the region aggressivly to catch issues.
4015   *
4016   * @throws IOException
4017   *           by flush / scan / compaction
4018   * @throws InterruptedException
4019   *           when joining threads
4020   */
4021  @Test
4022  public void testWritesWhileGetting() throws Exception {
4023    int testCount = 50;
4024    int numRows = 1;
4025    int numFamilies = 10;
4026    int numQualifiers = 100;
4027    int compactInterval = 100;
4028    byte[][] families = new byte[numFamilies][];
4029    for (int i = 0; i < numFamilies; i++) {
4030      families[i] = Bytes.toBytes("family" + i);
4031    }
4032    byte[][] qualifiers = new byte[numQualifiers][];
4033    for (int i = 0; i < numQualifiers; i++) {
4034      qualifiers[i] = Bytes.toBytes("qual" + i);
4035    }
4036
4037
4038    // This test flushes constantly and can cause many files to be created,
4039    // possibly
4040    // extending over the ulimit. Make sure compactions are aggressive in
4041    // reducing
4042    // the number of HFiles created.
4043    Configuration conf = HBaseConfiguration.create(CONF);
4044    conf.setInt("hbase.hstore.compaction.min", 1);
4045    conf.setInt("hbase.hstore.compaction.max", 1000);
4046    this.region = initHRegion(tableName, method, conf, families);
4047    PutThread putThread = null;
4048    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
4049    try {
4050      putThread = new PutThread(numRows, families, qualifiers);
4051      putThread.start();
4052      putThread.waitForFirstPut();
4053
4054      // Add a thread that flushes as fast as possible
4055      ctx.addThread(new RepeatingTestThread(ctx) {
4056
4057        @Override
4058        public void doAnAction() throws Exception {
4059          region.flush(true);
4060          // Compact regularly to avoid creating too many files and exceeding
4061          // the ulimit.
4062          region.compact(false);
4063          for (HStore store : region.getStores()) {
4064            store.closeAndArchiveCompactedFiles();
4065          }
4066        }
4067      });
4068      ctx.startThreads();
4069
4070      Get get = new Get(Bytes.toBytes("row0"));
4071      Result result = null;
4072
4073      int expectedCount = numFamilies * numQualifiers;
4074
4075      long prevTimestamp = 0L;
4076      for (int i = 0; i < testCount; i++) {
4077        LOG.info("testWritesWhileGetting verify turn " + i);
4078        boolean previousEmpty = result == null || result.isEmpty();
4079        result = region.get(get);
4080        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4081          assertEquals("i=" + i, expectedCount, result.size());
4082          // TODO this was removed, now what dangit?!
4083          // search looking for the qualifier in question?
4084          long timestamp = 0;
4085          for (Cell kv : result.rawCells()) {
4086            if (CellUtil.matchingFamily(kv, families[0])
4087                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4088              timestamp = kv.getTimestamp();
4089            }
4090          }
4091          assertTrue(timestamp >= prevTimestamp);
4092          prevTimestamp = timestamp;
4093          Cell previousKV = null;
4094
4095          for (Cell kv : result.rawCells()) {
4096            byte[] thisValue = CellUtil.cloneValue(kv);
4097            if (previousKV != null) {
4098              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4099                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4100                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
4101                    + "(memStoreTS:" + kv.getSequenceId() + ")");
4102                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4103              }
4104            }
4105            previousKV = kv;
4106          }
4107        }
4108      }
4109    } finally {
4110      if (putThread != null)
4111        putThread.done();
4112
4113      region.flush(true);
4114
4115      if (putThread != null) {
4116        putThread.join();
4117        putThread.checkNoError();
4118      }
4119
4120      ctx.stop();
4121      HBaseTestingUtility.closeRegionAndWAL(this.region);
4122      this.region = null;
4123    }
4124  }
4125
4126  @Test
4127  public void testHolesInMeta() throws Exception {
4128    byte[] family = Bytes.toBytes("family");
4129    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4130        false, family);
4131    try {
4132      byte[] rowNotServed = Bytes.toBytes("a");
4133      Get g = new Get(rowNotServed);
4134      try {
4135        region.get(g);
4136        fail();
4137      } catch (WrongRegionException x) {
4138        // OK
4139      }
4140      byte[] row = Bytes.toBytes("y");
4141      g = new Get(row);
4142      region.get(g);
4143    } finally {
4144      HBaseTestingUtility.closeRegionAndWAL(this.region);
4145      this.region = null;
4146    }
4147  }
4148
4149  @Test
4150  public void testIndexesScanWithOneDeletedRow() throws IOException {
4151    byte[] family = Bytes.toBytes("family");
4152
4153    // Setting up region
4154    this.region = initHRegion(tableName, method, CONF, family);
4155    try {
4156      Put put = new Put(Bytes.toBytes(1L));
4157      put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
4158      region.put(put);
4159
4160      region.flush(true);
4161
4162      Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4163      region.delete(delete);
4164
4165      put = new Put(Bytes.toBytes(2L));
4166      put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
4167      region.put(put);
4168
4169      Scan idxScan = new Scan();
4170      idxScan.addFamily(family);
4171      idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4172          new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
4173              new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4174              CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4175      InternalScanner scanner = region.getScanner(idxScan);
4176      List<Cell> res = new ArrayList<>();
4177
4178      while (scanner.next(res))
4179        ;
4180      assertEquals(1L, res.size());
4181    } finally {
4182      HBaseTestingUtility.closeRegionAndWAL(this.region);
4183      this.region = null;
4184    }
4185  }
4186
4187  // ////////////////////////////////////////////////////////////////////////////
4188  // Bloom filter test
4189  // ////////////////////////////////////////////////////////////////////////////
4190  @Test
4191  public void testBloomFilterSize() throws IOException {
4192    byte[] fam1 = Bytes.toBytes("fam1");
4193    byte[] qf1 = Bytes.toBytes("col");
4194    byte[] val1 = Bytes.toBytes("value1");
4195    // Create Table
4196    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
4197        .setBloomFilterType(BloomType.ROWCOL);
4198
4199    HTableDescriptor htd = new HTableDescriptor(tableName);
4200    htd.addFamily(hcd);
4201    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4202    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4203    try {
4204      int num_unique_rows = 10;
4205      int duplicate_multiplier = 2;
4206      int num_storefiles = 4;
4207
4208      int version = 0;
4209      for (int f = 0; f < num_storefiles; f++) {
4210        for (int i = 0; i < duplicate_multiplier; i++) {
4211          for (int j = 0; j < num_unique_rows; j++) {
4212            Put put = new Put(Bytes.toBytes("row" + j));
4213            put.setDurability(Durability.SKIP_WAL);
4214            long ts = version++;
4215            put.addColumn(fam1, qf1, ts, val1);
4216            region.put(put);
4217          }
4218        }
4219        region.flush(true);
4220      }
4221      // before compaction
4222      HStore store = region.getStore(fam1);
4223      Collection<HStoreFile> storeFiles = store.getStorefiles();
4224      for (HStoreFile storefile : storeFiles) {
4225        StoreFileReader reader = storefile.getReader();
4226        reader.loadFileInfo();
4227        reader.loadBloomfilter();
4228        assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4229        assertEquals(num_unique_rows, reader.getFilterEntries());
4230      }
4231
4232      region.compact(true);
4233
4234      // after compaction
4235      storeFiles = store.getStorefiles();
4236      for (HStoreFile storefile : storeFiles) {
4237        StoreFileReader reader = storefile.getReader();
4238        reader.loadFileInfo();
4239        reader.loadBloomfilter();
4240        assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4241        assertEquals(num_unique_rows, reader.getFilterEntries());
4242      }
4243    } finally {
4244      HBaseTestingUtility.closeRegionAndWAL(this.region);
4245      this.region = null;
4246    }
4247  }
4248
4249  @Test
4250  public void testAllColumnsWithBloomFilter() throws IOException {
4251    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4252    byte[] FAMILY = Bytes.toBytes("family");
4253
4254    // Create table
4255    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4256        .setBloomFilterType(BloomType.ROWCOL);
4257    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4258    htd.addFamily(hcd);
4259    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4260    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4261    try {
4262      // For row:0, col:0: insert versions 1 through 5.
4263      byte row[] = Bytes.toBytes("row:" + 0);
4264      byte column[] = Bytes.toBytes("column:" + 0);
4265      Put put = new Put(row);
4266      put.setDurability(Durability.SKIP_WAL);
4267      for (long idx = 1; idx <= 4; idx++) {
4268        put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4269      }
4270      region.put(put);
4271
4272      // Flush
4273      region.flush(true);
4274
4275      // Get rows
4276      Get get = new Get(row);
4277      get.setMaxVersions();
4278      Cell[] kvs = region.get(get).rawCells();
4279
4280      // Check if rows are correct
4281      assertEquals(4, kvs.length);
4282      checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4283      checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4284      checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4285      checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4286    } finally {
4287      HBaseTestingUtility.closeRegionAndWAL(this.region);
4288      this.region = null;
4289    }
4290  }
4291
4292  /**
4293   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4294   * issuing delete row on columns with bloom filter set to row+col
4295   * (BloomType.ROWCOL)
4296   */
4297  @Test
4298  public void testDeleteRowWithBloomFilter() throws IOException {
4299    byte[] familyName = Bytes.toBytes("familyName");
4300
4301    // Create Table
4302    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4303        .setBloomFilterType(BloomType.ROWCOL);
4304
4305    HTableDescriptor htd = new HTableDescriptor(tableName);
4306    htd.addFamily(hcd);
4307    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4308    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4309    try {
4310      // Insert some data
4311      byte row[] = Bytes.toBytes("row1");
4312      byte col[] = Bytes.toBytes("col1");
4313
4314      Put put = new Put(row);
4315      put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4316      region.put(put);
4317      region.flush(true);
4318
4319      Delete del = new Delete(row);
4320      region.delete(del);
4321      region.flush(true);
4322
4323      // Get remaining rows (should have none)
4324      Get get = new Get(row);
4325      get.addColumn(familyName, col);
4326
4327      Cell[] keyValues = region.get(get).rawCells();
4328      assertTrue(keyValues.length == 0);
4329    } finally {
4330      HBaseTestingUtility.closeRegionAndWAL(this.region);
4331      this.region = null;
4332    }
4333  }
4334
4335  @Test
4336  public void testgetHDFSBlocksDistribution() throws Exception {
4337    HBaseTestingUtility htu = new HBaseTestingUtility();
4338    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4339    // break up the file in to more pieces that can be distributed across the three nodes and we
4340    // won't be able to have the condition this test asserts; that at least one node has
4341    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4342    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4343    // final int DEFAULT_BLOCK_SIZE = 1024;
4344    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4345    htu.getConfiguration().setInt("dfs.replication", 2);
4346
4347    // set up a cluster with 3 nodes
4348    MiniHBaseCluster cluster = null;
4349    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4350    int regionServersCount = 3;
4351
4352    try {
4353      cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
4354      byte[][] families = { fam1, fam2 };
4355      Table ht = htu.createTable(tableName, families);
4356
4357      // Setting up region
4358      byte row[] = Bytes.toBytes("row1");
4359      byte col[] = Bytes.toBytes("col1");
4360
4361      Put put = new Put(row);
4362      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4363      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4364      ht.put(put);
4365
4366      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4367      firstRegion.flush(true);
4368      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4369
4370      // Given the default replication factor is 2 and we have 2 HFiles,
4371      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4372      // must be at least one host that have replica for 2 HFiles. That host's
4373      // weight will be equal to the unique block weight.
4374      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4375      StringBuilder sb = new StringBuilder();
4376      for (String host: blocksDistribution1.getTopHosts()) {
4377        if (sb.length() > 0) sb.append(", ");
4378        sb.append(host);
4379        sb.append("=");
4380        sb.append(blocksDistribution1.getWeight(host));
4381      }
4382
4383      String topHost = blocksDistribution1.getTopHosts().get(0);
4384      long topHostWeight = blocksDistribution1.getWeight(topHost);
4385      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4386        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4387      LOG.info(msg);
4388      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4389
4390      // use the static method to compute the value, it should be the same.
4391      // static method is used by load balancer or other components
4392      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4393          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4394      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4395
4396      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4397
4398      ht.close();
4399    } finally {
4400      if (cluster != null) {
4401        htu.shutdownMiniCluster();
4402      }
4403    }
4404  }
4405
4406  /**
4407   * Testcase to check state of region initialization task set to ABORTED or not
4408   * if any exceptions during initialization
4409   *
4410   * @throws Exception
4411   */
4412  @Test
4413  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4414    HRegionInfo info;
4415    try {
4416      FileSystem fs = Mockito.mock(FileSystem.class);
4417      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4418      HTableDescriptor htd = new HTableDescriptor(tableName);
4419      htd.addFamily(new HColumnDescriptor("cf"));
4420      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4421          HConstants.EMPTY_BYTE_ARRAY, false);
4422      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4423      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4424      // region initialization throws IOException and set task state to ABORTED.
4425      region.initialize();
4426      fail("Region initialization should fail due to IOException");
4427    } catch (IOException io) {
4428      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4429      for (MonitoredTask monitoredTask : tasks) {
4430        if (!(monitoredTask instanceof MonitoredRPCHandler)
4431            && monitoredTask.getDescription().contains(region.toString())) {
4432          assertTrue("Region state should be ABORTED.",
4433              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4434          break;
4435        }
4436      }
4437    } finally {
4438      HBaseTestingUtility.closeRegionAndWAL(region);
4439    }
4440  }
4441
4442  /**
4443   * Verifies that the .regioninfo file is written on region creation and that
4444   * is recreated if missing during region opening.
4445   */
4446  @Test
4447  public void testRegionInfoFileCreation() throws IOException {
4448    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4449
4450    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4451    htd.addFamily(new HColumnDescriptor("cf"));
4452
4453    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4454
4455    // Create a region and skip the initialization (like CreateTableHandler)
4456    HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4457    Path regionDir = region.getRegionFileSystem().getRegionDir();
4458    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4459    HBaseTestingUtility.closeRegionAndWAL(region);
4460
4461    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4462
4463    // Verify that the .regioninfo file is present
4464    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4465        fs.exists(regionInfoFile));
4466
4467    // Try to open the region
4468    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4469    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4470    HBaseTestingUtility.closeRegionAndWAL(region);
4471
4472    // Verify that the .regioninfo file is still there
4473    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4474        fs.exists(regionInfoFile));
4475
4476    // Remove the .regioninfo file and verify is recreated on region open
4477    fs.delete(regionInfoFile, true);
4478    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4479        fs.exists(regionInfoFile));
4480
4481    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4482//    region = TEST_UTIL.openHRegion(hri, htd);
4483    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4484    HBaseTestingUtility.closeRegionAndWAL(region);
4485
4486    // Verify that the .regioninfo file is still there
4487    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4488        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4489  }
4490
4491  /**
4492   * TestCase for increment
4493   */
4494  private static class Incrementer implements Runnable {
4495    private HRegion region;
4496    private final static byte[] incRow = Bytes.toBytes("incRow");
4497    private final static byte[] family = Bytes.toBytes("family");
4498    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4499    private final static long ONE = 1L;
4500    private int incCounter;
4501
4502    public Incrementer(HRegion region, int incCounter) {
4503      this.region = region;
4504      this.incCounter = incCounter;
4505    }
4506
4507    @Override
4508    public void run() {
4509      int count = 0;
4510      while (count < incCounter) {
4511        Increment inc = new Increment(incRow);
4512        inc.addColumn(family, qualifier, ONE);
4513        count++;
4514        try {
4515          region.increment(inc);
4516        } catch (IOException e) {
4517          LOG.info("Count=" + count + ", " + e);
4518          break;
4519        }
4520      }
4521    }
4522  }
4523
4524  /**
4525   * Test case to check increment function with memstore flushing
4526   * @throws Exception
4527   */
4528  @Test
4529  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4530    byte[] family = Incrementer.family;
4531    this.region = initHRegion(tableName, method, CONF, family);
4532    final HRegion region = this.region;
4533    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4534    Runnable flusher = new Runnable() {
4535      @Override
4536      public void run() {
4537        while (!incrementDone.get()) {
4538          try {
4539            region.flush(true);
4540          } catch (Exception e) {
4541            e.printStackTrace();
4542          }
4543        }
4544      }
4545    };
4546
4547    // after all increment finished, the row will increment to 20*100 = 2000
4548    int threadNum = 20;
4549    int incCounter = 100;
4550    long expected = (long) threadNum * incCounter;
4551    Thread[] incrementers = new Thread[threadNum];
4552    Thread flushThread = new Thread(flusher);
4553    for (int i = 0; i < threadNum; i++) {
4554      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4555      incrementers[i].start();
4556    }
4557    flushThread.start();
4558    for (int i = 0; i < threadNum; i++) {
4559      incrementers[i].join();
4560    }
4561
4562    incrementDone.set(true);
4563    flushThread.join();
4564
4565    Get get = new Get(Incrementer.incRow);
4566    get.addColumn(Incrementer.family, Incrementer.qualifier);
4567    get.setMaxVersions(1);
4568    Result res = this.region.get(get);
4569    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4570
4571    // we just got the latest version
4572    assertEquals(1, kvs.size());
4573    Cell kv = kvs.get(0);
4574    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4575    this.region = null;
4576  }
4577
4578  /**
4579   * TestCase for append
4580   */
4581  private static class Appender implements Runnable {
4582    private HRegion region;
4583    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4584    private final static byte[] family = Bytes.toBytes("family");
4585    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4586    private final static byte[] CHAR = Bytes.toBytes("a");
4587    private int appendCounter;
4588
4589    public Appender(HRegion region, int appendCounter) {
4590      this.region = region;
4591      this.appendCounter = appendCounter;
4592    }
4593
4594    @Override
4595    public void run() {
4596      int count = 0;
4597      while (count < appendCounter) {
4598        Append app = new Append(appendRow);
4599        app.addColumn(family, qualifier, CHAR);
4600        count++;
4601        try {
4602          region.append(app);
4603        } catch (IOException e) {
4604          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4605          break;
4606        }
4607      }
4608    }
4609  }
4610
4611  /**
4612   * Test case to check append function with memstore flushing
4613   * @throws Exception
4614   */
4615  @Test
4616  public void testParallelAppendWithMemStoreFlush() throws Exception {
4617    byte[] family = Appender.family;
4618    this.region = initHRegion(tableName, method, CONF, family);
4619    final HRegion region = this.region;
4620    final AtomicBoolean appendDone = new AtomicBoolean(false);
4621    Runnable flusher = new Runnable() {
4622      @Override
4623      public void run() {
4624        while (!appendDone.get()) {
4625          try {
4626            region.flush(true);
4627          } catch (Exception e) {
4628            e.printStackTrace();
4629          }
4630        }
4631      }
4632    };
4633
4634    // After all append finished, the value will append to threadNum *
4635    // appendCounter Appender.CHAR
4636    int threadNum = 20;
4637    int appendCounter = 100;
4638    byte[] expected = new byte[threadNum * appendCounter];
4639    for (int i = 0; i < threadNum * appendCounter; i++) {
4640      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4641    }
4642    Thread[] appenders = new Thread[threadNum];
4643    Thread flushThread = new Thread(flusher);
4644    for (int i = 0; i < threadNum; i++) {
4645      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4646      appenders[i].start();
4647    }
4648    flushThread.start();
4649    for (int i = 0; i < threadNum; i++) {
4650      appenders[i].join();
4651    }
4652
4653    appendDone.set(true);
4654    flushThread.join();
4655
4656    Get get = new Get(Appender.appendRow);
4657    get.addColumn(Appender.family, Appender.qualifier);
4658    get.setMaxVersions(1);
4659    Result res = this.region.get(get);
4660    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4661
4662    // we just got the latest version
4663    assertEquals(1, kvs.size());
4664    Cell kv = kvs.get(0);
4665    byte[] appendResult = new byte[kv.getValueLength()];
4666    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4667    assertArrayEquals(expected, appendResult);
4668    this.region = null;
4669  }
4670
4671  /**
4672   * Test case to check put function with memstore flushing for same row, same ts
4673   * @throws Exception
4674   */
4675  @Test
4676  public void testPutWithMemStoreFlush() throws Exception {
4677    byte[] family = Bytes.toBytes("family");
4678    byte[] qualifier = Bytes.toBytes("qualifier");
4679    byte[] row = Bytes.toBytes("putRow");
4680    byte[] value = null;
4681    this.region = initHRegion(tableName, method, CONF, family);
4682    Put put = null;
4683    Get get = null;
4684    List<Cell> kvs = null;
4685    Result res = null;
4686
4687    put = new Put(row);
4688    value = Bytes.toBytes("value0");
4689    put.addColumn(family, qualifier, 1234567L, value);
4690    region.put(put);
4691    get = new Get(row);
4692    get.addColumn(family, qualifier);
4693    get.setMaxVersions();
4694    res = this.region.get(get);
4695    kvs = res.getColumnCells(family, qualifier);
4696    assertEquals(1, kvs.size());
4697    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4698
4699    region.flush(true);
4700    get = new Get(row);
4701    get.addColumn(family, qualifier);
4702    get.setMaxVersions();
4703    res = this.region.get(get);
4704    kvs = res.getColumnCells(family, qualifier);
4705    assertEquals(1, kvs.size());
4706    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4707
4708    put = new Put(row);
4709    value = Bytes.toBytes("value1");
4710    put.addColumn(family, qualifier, 1234567L, value);
4711    region.put(put);
4712    get = new Get(row);
4713    get.addColumn(family, qualifier);
4714    get.setMaxVersions();
4715    res = this.region.get(get);
4716    kvs = res.getColumnCells(family, qualifier);
4717    assertEquals(1, kvs.size());
4718    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4719
4720    region.flush(true);
4721    get = new Get(row);
4722    get.addColumn(family, qualifier);
4723    get.setMaxVersions();
4724    res = this.region.get(get);
4725    kvs = res.getColumnCells(family, qualifier);
4726    assertEquals(1, kvs.size());
4727    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4728  }
4729
4730  @Test
4731  public void testDurability() throws Exception {
4732    // there are 5 x 5 cases:
4733    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4734    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4735
4736    // expected cases for append and sync wal
4737    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4738    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4739    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4740
4741    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4742    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4743    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4744
4745    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4746    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4747
4748    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4749    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4750
4751    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4752    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4753    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4754
4755    // expected cases for async wal
4756    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4757    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4758    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4759    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4760    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4761    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4762
4763    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4764    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4765    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4766    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4767    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4768    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4769
4770    // expect skip wal cases
4771    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4772    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4773    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4774    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4775    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4776    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4777
4778  }
4779
4780  private void durabilityTest(String method, Durability tableDurability,
4781      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4782      final boolean expectSyncFromLogSyncer) throws Exception {
4783    Configuration conf = HBaseConfiguration.create(CONF);
4784    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4785    byte[] family = Bytes.toBytes("family");
4786    Path logDir = new Path(new Path(dir + method), "log");
4787    final Configuration walConf = new Configuration(conf);
4788    FSUtils.setRootDir(walConf, logDir);
4789    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4790    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4791    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4792    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4793    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4794    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4795        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4796        new byte[][] { family });
4797
4798    Put put = new Put(Bytes.toBytes("r1"));
4799    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4800    put.setDurability(mutationDurability);
4801    region.put(put);
4802
4803    // verify append called or not
4804    verify(wal, expectAppend ? times(1) : never()).appendData((HRegionInfo) any(),
4805      (WALKeyImpl) any(), (WALEdit) any());
4806
4807    // verify sync called or not
4808    if (expectSync || expectSyncFromLogSyncer) {
4809      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4810        @Override
4811        public boolean evaluate() throws Exception {
4812          try {
4813            if (expectSync) {
4814              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4815            } else if (expectSyncFromLogSyncer) {
4816              verify(wal, times(1)).sync(); // wal syncer calls this one
4817            }
4818          } catch (Throwable ignore) {
4819          }
4820          return true;
4821        }
4822      });
4823    } else {
4824      //verify(wal, never()).sync(anyLong());
4825      verify(wal, never()).sync();
4826    }
4827
4828    HBaseTestingUtility.closeRegionAndWAL(this.region);
4829    wals.close();
4830    this.region = null;
4831  }
4832
4833  @Test
4834  public void testRegionReplicaSecondary() throws IOException {
4835    // create a primary region, load some data and flush
4836    // create a secondary region, and do a get against that
4837    Path rootDir = new Path(dir + name.getMethodName());
4838    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4839
4840    byte[][] families = new byte[][] {
4841        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4842    };
4843    byte[] cq = Bytes.toBytes("cq");
4844    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4845    for (byte[] family : families) {
4846      htd.addFamily(new HColumnDescriptor(family));
4847    }
4848
4849    long time = System.currentTimeMillis();
4850    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4851      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4852      false, time, 0);
4853    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4854      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4855      false, time, 1);
4856
4857    HRegion primaryRegion = null, secondaryRegion = null;
4858
4859    try {
4860      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4861          rootDir, TEST_UTIL.getConfiguration(), htd);
4862
4863      // load some data
4864      putData(primaryRegion, 0, 1000, cq, families);
4865
4866      // flush region
4867      primaryRegion.flush(true);
4868
4869      // open secondary region
4870      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4871
4872      verifyData(secondaryRegion, 0, 1000, cq, families);
4873    } finally {
4874      if (primaryRegion != null) {
4875        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4876      }
4877      if (secondaryRegion != null) {
4878        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4879      }
4880    }
4881  }
4882
4883  @Test
4884  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4885    // create a primary region, load some data and flush
4886    // create a secondary region, and do a put against that
4887    Path rootDir = new Path(dir + name.getMethodName());
4888    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4889
4890    byte[][] families = new byte[][] {
4891        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4892    };
4893    byte[] cq = Bytes.toBytes("cq");
4894    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4895    for (byte[] family : families) {
4896      htd.addFamily(new HColumnDescriptor(family));
4897    }
4898
4899    long time = System.currentTimeMillis();
4900    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4901      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4902      false, time, 0);
4903    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4904      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4905      false, time, 1);
4906
4907    HRegion primaryRegion = null, secondaryRegion = null;
4908
4909    try {
4910      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4911          rootDir, TEST_UTIL.getConfiguration(), htd);
4912
4913      // load some data
4914      putData(primaryRegion, 0, 1000, cq, families);
4915
4916      // flush region
4917      primaryRegion.flush(true);
4918
4919      // open secondary region
4920      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4921
4922      try {
4923        putData(secondaryRegion, 0, 1000, cq, families);
4924        fail("Should have thrown exception");
4925      } catch (IOException ex) {
4926        // expected
4927      }
4928    } finally {
4929      if (primaryRegion != null) {
4930        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4931      }
4932      if (secondaryRegion != null) {
4933        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4934      }
4935    }
4936  }
4937
4938  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4939    Configuration confForWAL = new Configuration(conf);
4940    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4941    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4942  }
4943
4944  @Test
4945  public void testCompactionFromPrimary() throws IOException {
4946    Path rootDir = new Path(dir + name.getMethodName());
4947    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4948
4949    byte[][] families = new byte[][] {
4950        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4951    };
4952    byte[] cq = Bytes.toBytes("cq");
4953    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4954    for (byte[] family : families) {
4955      htd.addFamily(new HColumnDescriptor(family));
4956    }
4957
4958    long time = System.currentTimeMillis();
4959    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4960      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4961      false, time, 0);
4962    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4963      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4964      false, time, 1);
4965
4966    HRegion primaryRegion = null, secondaryRegion = null;
4967
4968    try {
4969      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4970          rootDir, TEST_UTIL.getConfiguration(), htd);
4971
4972      // load some data
4973      putData(primaryRegion, 0, 1000, cq, families);
4974
4975      // flush region
4976      primaryRegion.flush(true);
4977
4978      // open secondary region
4979      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4980
4981      // move the file of the primary region to the archive, simulating a compaction
4982      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4983      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4984      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4985          .getStoreFiles(families[0]);
4986      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4987
4988      verifyData(secondaryRegion, 0, 1000, cq, families);
4989    } finally {
4990      if (primaryRegion != null) {
4991        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4992      }
4993      if (secondaryRegion != null) {
4994        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4995      }
4996    }
4997  }
4998
4999  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
5000      IOException {
5001    putData(this.region, startRow, numRows, qf, families);
5002  }
5003
5004  private void putData(HRegion region,
5005      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5006    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
5007  }
5008
5009  static void putData(HRegion region, Durability durability,
5010      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5011    for (int i = startRow; i < startRow + numRows; i++) {
5012      Put put = new Put(Bytes.toBytes("" + i));
5013      put.setDurability(durability);
5014      for (byte[] family : families) {
5015        put.addColumn(family, qf, null);
5016      }
5017      region.put(put);
5018      LOG.info(put.toString());
5019    }
5020  }
5021
5022  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
5023      throws IOException {
5024    for (int i = startRow; i < startRow + numRows; i++) {
5025      byte[] row = Bytes.toBytes("" + i);
5026      Get get = new Get(row);
5027      for (byte[] family : families) {
5028        get.addColumn(family, qf);
5029      }
5030      Result result = newReg.get(get);
5031      Cell[] raw = result.rawCells();
5032      assertEquals(families.length, result.size());
5033      for (int j = 0; j < families.length; j++) {
5034        assertTrue(CellUtil.matchingRows(raw[j], row));
5035        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
5036        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
5037      }
5038    }
5039  }
5040
5041  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
5042    // Now I have k, get values out and assert they are as expected.
5043    Get get = new Get(k).addFamily(family).setMaxVersions();
5044    Cell[] results = r.get(get).rawCells();
5045    for (int j = 0; j < results.length; j++) {
5046      byte[] tmp = CellUtil.cloneValue(results[j]);
5047      // Row should be equal to value every time.
5048      assertTrue(Bytes.equals(k, tmp));
5049    }
5050  }
5051
5052  /*
5053   * Assert first value in the passed region is <code>firstValue</code>.
5054   *
5055   * @param r
5056   *
5057   * @param fs
5058   *
5059   * @param firstValue
5060   *
5061   * @throws IOException
5062   */
5063  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5064      throws IOException {
5065    byte[][] families = { fs };
5066    Scan scan = new Scan();
5067    for (int i = 0; i < families.length; i++)
5068      scan.addFamily(families[i]);
5069    InternalScanner s = r.getScanner(scan);
5070    try {
5071      List<Cell> curVals = new ArrayList<>();
5072      boolean first = true;
5073      OUTER_LOOP: while (s.next(curVals)) {
5074        for (Cell kv : curVals) {
5075          byte[] val = CellUtil.cloneValue(kv);
5076          byte[] curval = val;
5077          if (first) {
5078            first = false;
5079            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5080          } else {
5081            // Not asserting anything. Might as well break.
5082            break OUTER_LOOP;
5083          }
5084        }
5085      }
5086    } finally {
5087      s.close();
5088    }
5089  }
5090
5091  /**
5092   * Test that we get the expected flush results back
5093   */
5094  @Test
5095  public void testFlushResult() throws IOException {
5096    byte[] family = Bytes.toBytes("family");
5097
5098    this.region = initHRegion(tableName, method, family);
5099
5100    // empty memstore, flush doesn't run
5101    HRegion.FlushResult fr = region.flush(true);
5102    assertFalse(fr.isFlushSucceeded());
5103    assertFalse(fr.isCompactionNeeded());
5104
5105    // Flush enough files to get up to the threshold, doesn't need compactions
5106    for (int i = 0; i < 2; i++) {
5107      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5108      region.put(put);
5109      fr = region.flush(true);
5110      assertTrue(fr.isFlushSucceeded());
5111      assertFalse(fr.isCompactionNeeded());
5112    }
5113
5114    // Two flushes after the threshold, compactions are needed
5115    for (int i = 0; i < 2; i++) {
5116      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5117      region.put(put);
5118      fr = region.flush(true);
5119      assertTrue(fr.isFlushSucceeded());
5120      assertTrue(fr.isCompactionNeeded());
5121    }
5122  }
5123
5124  protected Configuration initSplit() {
5125    // Always compact if there is more than one store file.
5126    CONF.setInt("hbase.hstore.compactionThreshold", 2);
5127
5128    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5129
5130    // Increase the amount of time between client retries
5131    CONF.setLong("hbase.client.pause", 15 * 1000);
5132
5133    // This size should make it so we always split using the addContent
5134    // below. After adding all data, the first region is 1.3M
5135    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5136    return CONF;
5137  }
5138
5139  /**
5140   * @return A region on which you must call
5141   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5142   */
5143  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5144      byte[]... families) throws IOException {
5145    return initHRegion(tableName, callingMethod, conf, false, families);
5146  }
5147
5148  /**
5149   * @return A region on which you must call
5150   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5151   */
5152  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5153      boolean isReadOnly, byte[]... families) throws IOException {
5154    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5155  }
5156
5157  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5158      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5159      throws IOException {
5160    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
5161    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
5162    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
5163    return initHRegion(tableName, startKey, stopKey, isReadOnly,
5164        Durability.SYNC_WAL, wal, families);
5165  }
5166
5167  /**
5168   * @return A region on which you must call
5169   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5170   */
5171  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5172      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
5173    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
5174    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
5175        isReadOnly, durability, wal, families);
5176  }
5177
5178  /**
5179   * Assert that the passed in Cell has expected contents for the specified row,
5180   * column & timestamp.
5181   */
5182  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5183    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5184    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5185        Bytes.toString(CellUtil.cloneRow(kv)));
5186    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5187        Bytes.toString(CellUtil.cloneFamily(kv)));
5188    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5189        Bytes.toString(CellUtil.cloneQualifier(kv)));
5190    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5191    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5192        Bytes.toString(CellUtil.cloneValue(kv)));
5193  }
5194
5195  @Test
5196  public void testReverseScanner_FromMemStore_SingleCF_Normal()
5197      throws IOException {
5198    byte[] rowC = Bytes.toBytes("rowC");
5199    byte[] rowA = Bytes.toBytes("rowA");
5200    byte[] rowB = Bytes.toBytes("rowB");
5201    byte[] cf = Bytes.toBytes("CF");
5202    byte[][] families = { cf };
5203    byte[] col = Bytes.toBytes("C");
5204    long ts = 1;
5205    this.region = initHRegion(tableName, method, families);
5206    try {
5207      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5208      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5209          null);
5210      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5211      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5212      Put put = null;
5213      put = new Put(rowC);
5214      put.add(kv1);
5215      put.add(kv11);
5216      region.put(put);
5217      put = new Put(rowA);
5218      put.add(kv2);
5219      region.put(put);
5220      put = new Put(rowB);
5221      put.add(kv3);
5222      region.put(put);
5223
5224      Scan scan = new Scan(rowC);
5225      scan.setMaxVersions(5);
5226      scan.setReversed(true);
5227      InternalScanner scanner = region.getScanner(scan);
5228      List<Cell> currRow = new ArrayList<>();
5229      boolean hasNext = scanner.next(currRow);
5230      assertEquals(2, currRow.size());
5231      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5232          .get(0).getRowLength(), rowC, 0, rowC.length));
5233      assertTrue(hasNext);
5234      currRow.clear();
5235      hasNext = scanner.next(currRow);
5236      assertEquals(1, currRow.size());
5237      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5238          .get(0).getRowLength(), rowB, 0, rowB.length));
5239      assertTrue(hasNext);
5240      currRow.clear();
5241      hasNext = scanner.next(currRow);
5242      assertEquals(1, currRow.size());
5243      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5244          .get(0).getRowLength(), rowA, 0, rowA.length));
5245      assertFalse(hasNext);
5246      scanner.close();
5247    } finally {
5248      HBaseTestingUtility.closeRegionAndWAL(this.region);
5249      this.region = null;
5250    }
5251  }
5252
5253  @Test
5254  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5255      throws IOException {
5256    byte[] rowC = Bytes.toBytes("rowC");
5257    byte[] rowA = Bytes.toBytes("rowA");
5258    byte[] rowB = Bytes.toBytes("rowB");
5259    byte[] rowD = Bytes.toBytes("rowD");
5260    byte[] cf = Bytes.toBytes("CF");
5261    byte[][] families = { cf };
5262    byte[] col = Bytes.toBytes("C");
5263    long ts = 1;
5264    this.region = initHRegion(tableName, method, families);
5265    try {
5266      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5267      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5268          null);
5269      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5270      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5271      Put put = null;
5272      put = new Put(rowC);
5273      put.add(kv1);
5274      put.add(kv11);
5275      region.put(put);
5276      put = new Put(rowA);
5277      put.add(kv2);
5278      region.put(put);
5279      put = new Put(rowB);
5280      put.add(kv3);
5281      region.put(put);
5282
5283      Scan scan = new Scan(rowD);
5284      List<Cell> currRow = new ArrayList<>();
5285      scan.setReversed(true);
5286      scan.setMaxVersions(5);
5287      InternalScanner scanner = region.getScanner(scan);
5288      boolean hasNext = scanner.next(currRow);
5289      assertEquals(2, currRow.size());
5290      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5291          .get(0).getRowLength(), rowC, 0, rowC.length));
5292      assertTrue(hasNext);
5293      currRow.clear();
5294      hasNext = scanner.next(currRow);
5295      assertEquals(1, currRow.size());
5296      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5297          .get(0).getRowLength(), rowB, 0, rowB.length));
5298      assertTrue(hasNext);
5299      currRow.clear();
5300      hasNext = scanner.next(currRow);
5301      assertEquals(1, currRow.size());
5302      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5303          .get(0).getRowLength(), rowA, 0, rowA.length));
5304      assertFalse(hasNext);
5305      scanner.close();
5306    } finally {
5307      HBaseTestingUtility.closeRegionAndWAL(this.region);
5308      this.region = null;
5309    }
5310  }
5311
5312  @Test
5313  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5314      throws IOException {
5315    byte[] rowC = Bytes.toBytes("rowC");
5316    byte[] rowA = Bytes.toBytes("rowA");
5317    byte[] rowB = Bytes.toBytes("rowB");
5318    byte[] cf = Bytes.toBytes("CF");
5319    byte[][] families = { cf };
5320    byte[] col = Bytes.toBytes("C");
5321    long ts = 1;
5322    this.region = initHRegion(tableName, method, families);
5323    try {
5324      KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5325      KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5326          null);
5327      KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5328      KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5329      Put put = null;
5330      put = new Put(rowC);
5331      put.add(kv1);
5332      put.add(kv11);
5333      region.put(put);
5334      put = new Put(rowA);
5335      put.add(kv2);
5336      region.put(put);
5337      put = new Put(rowB);
5338      put.add(kv3);
5339      region.put(put);
5340      Scan scan = new Scan();
5341      List<Cell> currRow = new ArrayList<>();
5342      scan.setReversed(true);
5343      InternalScanner scanner = region.getScanner(scan);
5344      boolean hasNext = scanner.next(currRow);
5345      assertEquals(1, currRow.size());
5346      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5347          .get(0).getRowLength(), rowC, 0, rowC.length));
5348      assertTrue(hasNext);
5349      currRow.clear();
5350      hasNext = scanner.next(currRow);
5351      assertEquals(1, currRow.size());
5352      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5353          .get(0).getRowLength(), rowB, 0, rowB.length));
5354      assertTrue(hasNext);
5355      currRow.clear();
5356      hasNext = scanner.next(currRow);
5357      assertEquals(1, currRow.size());
5358      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5359          .get(0).getRowLength(), rowA, 0, rowA.length));
5360      assertFalse(hasNext);
5361      scanner.close();
5362    } finally {
5363      HBaseTestingUtility.closeRegionAndWAL(this.region);
5364      this.region = null;
5365    }
5366  }
5367
5368  @Test
5369  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5370    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5371    byte[] rowA = Bytes.toBytes("rowA");
5372    byte[] rowB = Bytes.toBytes("rowB");
5373    byte[] rowC = Bytes.toBytes("rowC");
5374    byte[] rowD = Bytes.toBytes("rowD");
5375    byte[] rowE = Bytes.toBytes("rowE");
5376    byte[] cf = Bytes.toBytes("CF");
5377    byte[][] families = { cf };
5378    byte[] col1 = Bytes.toBytes("col1");
5379    byte[] col2 = Bytes.toBytes("col2");
5380    long ts = 1;
5381    this.region = initHRegion(tableName, method, families);
5382    try {
5383      KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5384      KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5385      KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5386      KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5387      KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5388      KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5389      Put put = null;
5390      put = new Put(rowA);
5391      put.add(kv1);
5392      region.put(put);
5393      put = new Put(rowB);
5394      put.add(kv2);
5395      region.put(put);
5396      put = new Put(rowC);
5397      put.add(kv3);
5398      region.put(put);
5399      put = new Put(rowD);
5400      put.add(kv4_1);
5401      region.put(put);
5402      put = new Put(rowD);
5403      put.add(kv4_2);
5404      region.put(put);
5405      put = new Put(rowE);
5406      put.add(kv5);
5407      region.put(put);
5408      region.flush(true);
5409      Scan scan = new Scan(rowD, rowA);
5410      scan.addColumn(families[0], col1);
5411      scan.setReversed(true);
5412      List<Cell> currRow = new ArrayList<>();
5413      InternalScanner scanner = region.getScanner(scan);
5414      boolean hasNext = scanner.next(currRow);
5415      assertEquals(1, currRow.size());
5416      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5417          .get(0).getRowLength(), rowD, 0, rowD.length));
5418      assertTrue(hasNext);
5419      currRow.clear();
5420      hasNext = scanner.next(currRow);
5421      assertEquals(1, currRow.size());
5422      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5423          .get(0).getRowLength(), rowC, 0, rowC.length));
5424      assertTrue(hasNext);
5425      currRow.clear();
5426      hasNext = scanner.next(currRow);
5427      assertEquals(1, currRow.size());
5428      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5429          .get(0).getRowLength(), rowB, 0, rowB.length));
5430      assertFalse(hasNext);
5431      scanner.close();
5432
5433      scan = new Scan(rowD, rowA);
5434      scan.addColumn(families[0], col2);
5435      scan.setReversed(true);
5436      currRow.clear();
5437      scanner = region.getScanner(scan);
5438      hasNext = scanner.next(currRow);
5439      assertEquals(1, currRow.size());
5440      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5441          .get(0).getRowLength(), rowD, 0, rowD.length));
5442      scanner.close();
5443    } finally {
5444      HBaseTestingUtility.closeRegionAndWAL(this.region);
5445      this.region = null;
5446    }
5447  }
5448
5449  @Test
5450  public void testReverseScanner_smaller_blocksize() throws IOException {
5451    // case to ensure no conflict with HFile index optimization
5452    byte[] rowA = Bytes.toBytes("rowA");
5453    byte[] rowB = Bytes.toBytes("rowB");
5454    byte[] rowC = Bytes.toBytes("rowC");
5455    byte[] rowD = Bytes.toBytes("rowD");
5456    byte[] rowE = Bytes.toBytes("rowE");
5457    byte[] cf = Bytes.toBytes("CF");
5458    byte[][] families = { cf };
5459    byte[] col1 = Bytes.toBytes("col1");
5460    byte[] col2 = Bytes.toBytes("col2");
5461    long ts = 1;
5462    HBaseConfiguration config = new HBaseConfiguration();
5463    config.setInt("test.block.size", 1);
5464    this.region = initHRegion(tableName, method, config, families);
5465    try {
5466      KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5467      KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5468      KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5469      KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5470      KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5471      KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5472      Put put = null;
5473      put = new Put(rowA);
5474      put.add(kv1);
5475      region.put(put);
5476      put = new Put(rowB);
5477      put.add(kv2);
5478      region.put(put);
5479      put = new Put(rowC);
5480      put.add(kv3);
5481      region.put(put);
5482      put = new Put(rowD);
5483      put.add(kv4_1);
5484      region.put(put);
5485      put = new Put(rowD);
5486      put.add(kv4_2);
5487      region.put(put);
5488      put = new Put(rowE);
5489      put.add(kv5);
5490      region.put(put);
5491      region.flush(true);
5492      Scan scan = new Scan(rowD, rowA);
5493      scan.addColumn(families[0], col1);
5494      scan.setReversed(true);
5495      List<Cell> currRow = new ArrayList<>();
5496      InternalScanner scanner = region.getScanner(scan);
5497      boolean hasNext = scanner.next(currRow);
5498      assertEquals(1, currRow.size());
5499      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5500          .get(0).getRowLength(), rowD, 0, rowD.length));
5501      assertTrue(hasNext);
5502      currRow.clear();
5503      hasNext = scanner.next(currRow);
5504      assertEquals(1, currRow.size());
5505      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5506          .get(0).getRowLength(), rowC, 0, rowC.length));
5507      assertTrue(hasNext);
5508      currRow.clear();
5509      hasNext = scanner.next(currRow);
5510      assertEquals(1, currRow.size());
5511      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5512          .get(0).getRowLength(), rowB, 0, rowB.length));
5513      assertFalse(hasNext);
5514      scanner.close();
5515
5516      scan = new Scan(rowD, rowA);
5517      scan.addColumn(families[0], col2);
5518      scan.setReversed(true);
5519      currRow.clear();
5520      scanner = region.getScanner(scan);
5521      hasNext = scanner.next(currRow);
5522      assertEquals(1, currRow.size());
5523      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5524          .get(0).getRowLength(), rowD, 0, rowD.length));
5525      scanner.close();
5526    } finally {
5527      HBaseTestingUtility.closeRegionAndWAL(this.region);
5528      this.region = null;
5529    }
5530  }
5531
5532  @Test
5533  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5534      throws IOException {
5535    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5536    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5537    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5538    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5539    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5540    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5541    byte[] cf1 = Bytes.toBytes("CF1");
5542    byte[] cf2 = Bytes.toBytes("CF2");
5543    byte[] cf3 = Bytes.toBytes("CF3");
5544    byte[][] families = { cf1, cf2, cf3 };
5545    byte[] col = Bytes.toBytes("C");
5546    long ts = 1;
5547    HBaseConfiguration conf = new HBaseConfiguration();
5548    // disable compactions in this test.
5549    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5550    this.region = initHRegion(tableName, method, conf, families);
5551    try {
5552      // kv naming style: kv(row number) totalKvCountInThisRow seq no
5553      KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5554          null);
5555      KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5556          null);
5557      KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5558          KeyValue.Type.Put, null);
5559      KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5560          null);
5561      KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5562          null);
5563      KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5564          null);
5565      KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5566          KeyValue.Type.Put, null);
5567      KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5568          null);
5569      KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5570          KeyValue.Type.Put, null);
5571      KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5572          null);
5573      KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5574          null);
5575      KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5576          KeyValue.Type.Put, null);
5577      KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5578          null);
5579      KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5580          KeyValue.Type.Put, null);
5581      KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5582          null);
5583      KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5584          null);
5585      // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5586      Put put = null;
5587      put = new Put(row1);
5588      put.add(kv1_2_1);
5589      region.put(put);
5590      put = new Put(row2);
5591      put.add(kv2_4_1);
5592      region.put(put);
5593      put = new Put(row4);
5594      put.add(kv4_5_4);
5595      put.add(kv4_5_5);
5596      region.put(put);
5597      region.flush(true);
5598      // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5599      put = new Put(row4);
5600      put.add(kv4_5_1);
5601      put.add(kv4_5_3);
5602      region.put(put);
5603      put = new Put(row1);
5604      put.add(kv1_2_2);
5605      region.put(put);
5606      put = new Put(row2);
5607      put.add(kv2_4_4);
5608      region.put(put);
5609      region.flush(true);
5610      // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5611      put = new Put(row4);
5612      put.add(kv4_5_2);
5613      region.put(put);
5614      put = new Put(row2);
5615      put.add(kv2_4_2);
5616      put.add(kv2_4_3);
5617      region.put(put);
5618      put = new Put(row3);
5619      put.add(kv3_2_2);
5620      region.put(put);
5621      region.flush(true);
5622      // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5623      // ( 2 kv)
5624      put = new Put(row0);
5625      put.add(kv0_1_1);
5626      region.put(put);
5627      put = new Put(row3);
5628      put.add(kv3_2_1);
5629      region.put(put);
5630      put = new Put(row5);
5631      put.add(kv5_2_1);
5632      put.add(kv5_2_2);
5633      region.put(put);
5634      // scan range = ["row4", min), skip the max "row5"
5635      Scan scan = new Scan(row4);
5636      scan.setMaxVersions(5);
5637      scan.setBatch(3);
5638      scan.setReversed(true);
5639      InternalScanner scanner = region.getScanner(scan);
5640      List<Cell> currRow = new ArrayList<>();
5641      boolean hasNext = false;
5642      // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5643      // included in scan range
5644      // "row4" takes 2 next() calls since batch=3
5645      hasNext = scanner.next(currRow);
5646      assertEquals(3, currRow.size());
5647      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5648          .get(0).getRowLength(), row4, 0, row4.length));
5649      assertTrue(hasNext);
5650      currRow.clear();
5651      hasNext = scanner.next(currRow);
5652      assertEquals(2, currRow.size());
5653      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5654          currRow.get(0).getRowLength(), row4, 0,
5655        row4.length));
5656      assertTrue(hasNext);
5657      // 2. scan out "row3" (2 kv)
5658      currRow.clear();
5659      hasNext = scanner.next(currRow);
5660      assertEquals(2, currRow.size());
5661      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5662          .get(0).getRowLength(), row3, 0, row3.length));
5663      assertTrue(hasNext);
5664      // 3. scan out "row2" (4 kvs)
5665      // "row2" takes 2 next() calls since batch=3
5666      currRow.clear();
5667      hasNext = scanner.next(currRow);
5668      assertEquals(3, currRow.size());
5669      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5670          .get(0).getRowLength(), row2, 0, row2.length));
5671      assertTrue(hasNext);
5672      currRow.clear();
5673      hasNext = scanner.next(currRow);
5674      assertEquals(1, currRow.size());
5675      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5676          .get(0).getRowLength(), row2, 0, row2.length));
5677      assertTrue(hasNext);
5678      // 4. scan out "row1" (2 kv)
5679      currRow.clear();
5680      hasNext = scanner.next(currRow);
5681      assertEquals(2, currRow.size());
5682      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5683          .get(0).getRowLength(), row1, 0, row1.length));
5684      assertTrue(hasNext);
5685      // 5. scan out "row0" (1 kv)
5686      currRow.clear();
5687      hasNext = scanner.next(currRow);
5688      assertEquals(1, currRow.size());
5689      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5690          .get(0).getRowLength(), row0, 0, row0.length));
5691      assertFalse(hasNext);
5692
5693      scanner.close();
5694    } finally {
5695      HBaseTestingUtility.closeRegionAndWAL(this.region);
5696      this.region = null;
5697    }
5698  }
5699
5700  @Test
5701  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5702      throws IOException {
5703    byte[] row1 = Bytes.toBytes("row1");
5704    byte[] row2 = Bytes.toBytes("row2");
5705    byte[] row3 = Bytes.toBytes("row3");
5706    byte[] row4 = Bytes.toBytes("row4");
5707    byte[] cf1 = Bytes.toBytes("CF1");
5708    byte[] cf2 = Bytes.toBytes("CF2");
5709    byte[] cf3 = Bytes.toBytes("CF3");
5710    byte[] cf4 = Bytes.toBytes("CF4");
5711    byte[][] families = { cf1, cf2, cf3, cf4 };
5712    byte[] col = Bytes.toBytes("C");
5713    long ts = 1;
5714    HBaseConfiguration conf = new HBaseConfiguration();
5715    // disable compactions in this test.
5716    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5717    this.region = initHRegion(tableName, method, conf, families);
5718    try {
5719      KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5720      KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5721      KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5722      KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5723      // storefile1
5724      Put put = new Put(row1);
5725      put.add(kv1);
5726      region.put(put);
5727      region.flush(true);
5728      // storefile2
5729      put = new Put(row2);
5730      put.add(kv2);
5731      region.put(put);
5732      region.flush(true);
5733      // storefile3
5734      put = new Put(row3);
5735      put.add(kv3);
5736      region.put(put);
5737      region.flush(true);
5738      // memstore
5739      put = new Put(row4);
5740      put.add(kv4);
5741      region.put(put);
5742      // scan range = ["row4", min)
5743      Scan scan = new Scan(row4);
5744      scan.setReversed(true);
5745      scan.setBatch(10);
5746      InternalScanner scanner = region.getScanner(scan);
5747      List<Cell> currRow = new ArrayList<>();
5748      boolean hasNext = scanner.next(currRow);
5749      assertEquals(1, currRow.size());
5750      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5751          .get(0).getRowLength(), row4, 0, row4.length));
5752      assertTrue(hasNext);
5753      currRow.clear();
5754      hasNext = scanner.next(currRow);
5755      assertEquals(1, currRow.size());
5756      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5757          .get(0).getRowLength(), row3, 0, row3.length));
5758      assertTrue(hasNext);
5759      currRow.clear();
5760      hasNext = scanner.next(currRow);
5761      assertEquals(1, currRow.size());
5762      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5763          .get(0).getRowLength(), row2, 0, row2.length));
5764      assertTrue(hasNext);
5765      currRow.clear();
5766      hasNext = scanner.next(currRow);
5767      assertEquals(1, currRow.size());
5768      assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5769          .get(0).getRowLength(), row1, 0, row1.length));
5770      assertFalse(hasNext);
5771    } finally {
5772      HBaseTestingUtility.closeRegionAndWAL(this.region);
5773      this.region = null;
5774    }
5775  }
5776
5777  /**
5778   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5779   */
5780  @Test
5781  public void testReverseScanner_StackOverflow() throws IOException {
5782    byte[] cf1 = Bytes.toBytes("CF1");
5783    byte[][] families = {cf1};
5784    byte[] col = Bytes.toBytes("C");
5785    HBaseConfiguration conf = new HBaseConfiguration();
5786    this.region = initHRegion(tableName, method, conf, families);
5787    try {
5788      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5789      Put put = new Put(Bytes.toBytes("19998"));
5790      put.addColumn(cf1, col, Bytes.toBytes("val"));
5791      region.put(put);
5792      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5793      Put put2 = new Put(Bytes.toBytes("19997"));
5794      put2.addColumn(cf1, col, Bytes.toBytes("val"));
5795      region.put(put2);
5796
5797      Scan scan = new Scan(Bytes.toBytes("19998"));
5798      scan.setReversed(true);
5799      InternalScanner scanner = region.getScanner(scan);
5800
5801      // create one storefile contains many rows will be skipped
5802      // to check StoreFileScanner.seekToPreviousRow
5803      for (int i = 10000; i < 20000; i++) {
5804        Put p = new Put(Bytes.toBytes(""+i));
5805        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5806        region.put(p);
5807      }
5808      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5809
5810      // create one memstore contains many rows will be skipped
5811      // to check MemStoreScanner.seekToPreviousRow
5812      for (int i = 10000; i < 20000; i++) {
5813        Put p = new Put(Bytes.toBytes(""+i));
5814        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5815        region.put(p);
5816      }
5817
5818      List<Cell> currRow = new ArrayList<>();
5819      boolean hasNext;
5820      do {
5821        hasNext = scanner.next(currRow);
5822      } while (hasNext);
5823      assertEquals(2, currRow.size());
5824      assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5825        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5826      assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5827        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5828    } finally {
5829      HBaseTestingUtility.closeRegionAndWAL(this.region);
5830      this.region = null;
5831    }
5832  }
5833
5834  @Test
5835  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5836    byte[] cf1 = Bytes.toBytes("CF1");
5837    byte[][] families = { cf1 };
5838    byte[] col = Bytes.toBytes("C");
5839    HBaseConfiguration conf = new HBaseConfiguration();
5840    this.region = initHRegion(tableName, method, conf, families);
5841    try {
5842      // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5843      Put put = new Put(Bytes.toBytes("19996"));
5844      put.addColumn(cf1, col, Bytes.toBytes("val"));
5845      region.put(put);
5846      Put put2 = new Put(Bytes.toBytes("19995"));
5847      put2.addColumn(cf1, col, Bytes.toBytes("val"));
5848      region.put(put2);
5849      // create a reverse scan
5850      Scan scan = new Scan(Bytes.toBytes("19996"));
5851      scan.setReversed(true);
5852      RegionScannerImpl scanner = region.getScanner(scan);
5853
5854      // flush the cache. This will reset the store scanner
5855      region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5856
5857      // create one memstore contains many rows will be skipped
5858      // to check MemStoreScanner.seekToPreviousRow
5859      for (int i = 10000; i < 20000; i++) {
5860        Put p = new Put(Bytes.toBytes("" + i));
5861        p.addColumn(cf1, col, Bytes.toBytes("" + i));
5862        region.put(p);
5863      }
5864      List<Cell> currRow = new ArrayList<>();
5865      boolean hasNext;
5866      boolean assertDone = false;
5867      do {
5868        hasNext = scanner.next(currRow);
5869        // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5870        // added here
5871        if (!assertDone) {
5872          StoreScanner current =
5873              (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5874          List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5875          assertEquals("There should be only one scanner the store file scanner", 1,
5876            scanners.size());
5877          assertDone = true;
5878        }
5879      } while (hasNext);
5880      assertEquals(2, currRow.size());
5881      assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5882        currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5883      assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5884        currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5885    } finally {
5886      HBaseTestingUtility.closeRegionAndWAL(this.region);
5887      this.region = null;
5888    }
5889  }
5890
5891  @Test
5892  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5893    byte[] cf1 = Bytes.toBytes("CF1");
5894    byte[][] families = { cf1 };
5895    byte[] col = Bytes.toBytes("C");
5896
5897    HBaseConfiguration conf = new HBaseConfiguration();
5898    this.region = initHRegion(tableName, method, conf, families);
5899
5900    Put put = new Put(Bytes.toBytes("199996"));
5901    put.addColumn(cf1, col, Bytes.toBytes("val"));
5902    region.put(put);
5903    Put put2 = new Put(Bytes.toBytes("199995"));
5904    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5905    region.put(put2);
5906
5907    // Create a reverse scan
5908    Scan scan = new Scan(Bytes.toBytes("199996"));
5909    scan.setReversed(true);
5910    RegionScannerImpl scanner = region.getScanner(scan);
5911
5912    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5913    for (int i = 100000; i < 200000; i++) {
5914      Put p = new Put(Bytes.toBytes("" + i));
5915      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5916      region.put(p);
5917    }
5918    List<Cell> currRow = new ArrayList<>();
5919    boolean hasNext;
5920    do {
5921      hasNext = scanner.next(currRow);
5922    } while (hasNext);
5923
5924    assertEquals(2, currRow.size());
5925    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5926      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5927    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5928      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5929  }
5930
5931  @Test
5932  public void testWriteRequestsCounter() throws IOException {
5933    byte[] fam = Bytes.toBytes("info");
5934    byte[][] families = { fam };
5935    this.region = initHRegion(tableName, method, CONF, families);
5936
5937    Assert.assertEquals(0L, region.getWriteRequestsCount());
5938
5939    Put put = new Put(row);
5940    put.addColumn(fam, fam, fam);
5941
5942    Assert.assertEquals(0L, region.getWriteRequestsCount());
5943    region.put(put);
5944    Assert.assertEquals(1L, region.getWriteRequestsCount());
5945    region.put(put);
5946    Assert.assertEquals(2L, region.getWriteRequestsCount());
5947    region.put(put);
5948    Assert.assertEquals(3L, region.getWriteRequestsCount());
5949
5950    region.delete(new Delete(row));
5951    Assert.assertEquals(4L, region.getWriteRequestsCount());
5952
5953    HBaseTestingUtility.closeRegionAndWAL(this.region);
5954    this.region = null;
5955  }
5956
5957  @Test
5958  public void testOpenRegionWrittenToWAL() throws Exception {
5959    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5960    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5961
5962    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5963      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5964      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5965    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5966
5967    // open the region w/o rss and wal and flush some files
5968    HRegion region =
5969         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5970             .getConfiguration(), htd);
5971    assertNotNull(region);
5972
5973    // create a file in fam1 for the region before opening in OpenRegionHandler
5974    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5975    region.flush(true);
5976    HBaseTestingUtility.closeRegionAndWAL(region);
5977
5978    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5979
5980    // capture append() calls
5981    WAL wal = mockWAL();
5982    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5983
5984    try {
5985      region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5986        TEST_UTIL.getConfiguration(), rss, null);
5987
5988      verify(wal, times(1)).appendMarker((HRegionInfo)any(), (WALKeyImpl)any()
5989        , editCaptor.capture());
5990
5991      WALEdit edit = editCaptor.getValue();
5992      assertNotNull(edit);
5993      assertNotNull(edit.getCells());
5994      assertEquals(1, edit.getCells().size());
5995      RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5996      assertNotNull(desc);
5997
5998      LOG.info("RegionEventDescriptor from WAL: " + desc);
5999
6000      assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
6001      assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
6002      assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6003        hri.getEncodedNameAsBytes()));
6004      assertTrue(desc.getLogSequenceNumber() > 0);
6005      assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6006      assertEquals(2, desc.getStoresCount());
6007
6008      StoreDescriptor store = desc.getStores(0);
6009      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6010      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6011      assertEquals(1, store.getStoreFileCount()); // 1store file
6012      assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
6013
6014      store = desc.getStores(1);
6015      assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6016      assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6017      assertEquals(0, store.getStoreFileCount()); // no store files
6018    } finally {
6019      HBaseTestingUtility.closeRegionAndWAL(region);
6020    }
6021  }
6022
6023  // Helper for test testOpenRegionWrittenToWALForLogReplay
6024  static class HRegionWithSeqId extends HRegion {
6025    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
6026        final Configuration confParam, final RegionInfo regionInfo,
6027        final TableDescriptor htd, final RegionServerServices rsServices) {
6028      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
6029    }
6030    @Override
6031    protected long getNextSequenceId(WAL wal) throws IOException {
6032      return 42;
6033    }
6034  }
6035
6036  @Test
6037  public void testFlushedFileWithNoTags() throws Exception {
6038    final TableName tableName = TableName.valueOf(name.getMethodName());
6039    HTableDescriptor htd = new HTableDescriptor(tableName);
6040    htd.addFamily(new HColumnDescriptor(fam1));
6041    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
6042    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
6043    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
6044    Put put = new Put(Bytes.toBytes("a-b-0-0"));
6045    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
6046    region.put(put);
6047    region.flush(true);
6048    HStore store = region.getStore(fam1);
6049    Collection<HStoreFile> storefiles = store.getStorefiles();
6050    for (HStoreFile sf : storefiles) {
6051      assertFalse("Tags should not be present "
6052          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
6053    }
6054  }
6055
6056  /**
6057   * Utility method to setup a WAL mock.
6058   * <p/>
6059   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
6060   * @return a mock WAL
6061   */
6062  private WAL mockWAL() throws IOException {
6063    WAL wal = mock(WAL.class);
6064    when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
6065      .thenAnswer(new Answer<Long>() {
6066        @Override
6067        public Long answer(InvocationOnMock invocation) throws Throwable {
6068          WALKeyImpl key = invocation.getArgument(1);
6069          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6070          key.setWriteEntry(we);
6071          return 1L;
6072        }
6073      });
6074    when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))).
6075        thenAnswer(new Answer<Long>() {
6076          @Override
6077          public Long answer(InvocationOnMock invocation) throws Throwable {
6078            WALKeyImpl key = invocation.getArgument(1);
6079            MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6080            key.setWriteEntry(we);
6081            return 1L;
6082          }
6083        });
6084    return wal;
6085  }
6086
6087  @Test
6088  public void testCloseRegionWrittenToWAL() throws Exception {
6089    Path rootDir = new Path(dir + name.getMethodName());
6090    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
6091
6092    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6093    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6094
6095    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
6096      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
6097      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
6098    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
6099
6100    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6101
6102    // capture append() calls
6103    WAL wal = mockWAL();
6104    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
6105
6106
6107    // create and then open a region first so that it can be closed later
6108    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
6109    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6110      TEST_UTIL.getConfiguration(), rss, null);
6111
6112    // close the region
6113    region.close(false);
6114
6115    // 2 times, one for region open, the other close region
6116    verify(wal, times(2)).appendMarker(any(RegionInfo.class),
6117        (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture());
6118
6119    WALEdit edit = editCaptor.getAllValues().get(1);
6120    assertNotNull(edit);
6121    assertNotNull(edit.getCells());
6122    assertEquals(1, edit.getCells().size());
6123    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6124    assertNotNull(desc);
6125
6126    LOG.info("RegionEventDescriptor from WAL: " + desc);
6127
6128    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6129    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
6130    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6131      hri.getEncodedNameAsBytes()));
6132    assertTrue(desc.getLogSequenceNumber() > 0);
6133    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6134    assertEquals(2, desc.getStoresCount());
6135
6136    StoreDescriptor store = desc.getStores(0);
6137    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6138    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6139    assertEquals(0, store.getStoreFileCount()); // no store files
6140
6141    store = desc.getStores(1);
6142    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6143    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6144    assertEquals(0, store.getStoreFileCount()); // no store files
6145  }
6146
6147  /**
6148   * Test RegionTooBusyException thrown when region is busy
6149   */
6150  @Test
6151  public void testRegionTooBusy() throws IOException {
6152    byte[] family = Bytes.toBytes("family");
6153    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6154      HRegion.DEFAULT_BUSY_WAIT_DURATION);
6155    CONF.setLong("hbase.busy.wait.duration", 1000);
6156    region = initHRegion(tableName, method, CONF, family);
6157    final AtomicBoolean stopped = new AtomicBoolean(true);
6158    Thread t = new Thread(new Runnable() {
6159      @Override
6160      public void run() {
6161        try {
6162          region.lock.writeLock().lock();
6163          stopped.set(false);
6164          while (!stopped.get()) {
6165            Thread.sleep(100);
6166          }
6167        } catch (InterruptedException ie) {
6168        } finally {
6169          region.lock.writeLock().unlock();
6170        }
6171      }
6172    });
6173    t.start();
6174    Get get = new Get(row);
6175    try {
6176      while (stopped.get()) {
6177        Thread.sleep(100);
6178      }
6179      region.get(get);
6180      fail("Should throw RegionTooBusyException");
6181    } catch (InterruptedException ie) {
6182      fail("test interrupted");
6183    } catch (RegionTooBusyException e) {
6184      // Good, expected
6185    } finally {
6186      stopped.set(true);
6187      try {
6188        t.join();
6189      } catch (Throwable e) {
6190      }
6191
6192      HBaseTestingUtility.closeRegionAndWAL(region);
6193      region = null;
6194      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6195    }
6196  }
6197
6198  @Test
6199  public void testCellTTLs() throws IOException {
6200    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6201    EnvironmentEdgeManager.injectEdge(edge);
6202
6203    final byte[] row = Bytes.toBytes("testRow");
6204    final byte[] q1 = Bytes.toBytes("q1");
6205    final byte[] q2 = Bytes.toBytes("q2");
6206    final byte[] q3 = Bytes.toBytes("q3");
6207    final byte[] q4 = Bytes.toBytes("q4");
6208
6209    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6210    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
6211    hcd.setTimeToLive(10); // 10 seconds
6212    htd.addFamily(hcd);
6213
6214    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6215    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6216
6217    HRegion region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
6218            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
6219        TEST_UTIL.getDataTestDir(), conf, htd);
6220    assertNotNull(region);
6221    try {
6222      long now = EnvironmentEdgeManager.currentTime();
6223      // Add a cell that will expire in 5 seconds via cell TTL
6224      region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6225        HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6226          // TTL tags specify ts in milliseconds
6227          new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6228      // Add a cell that will expire after 10 seconds via family setting
6229      region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6230      // Add a cell that will expire in 15 seconds via cell TTL
6231      region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6232        HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6233          // TTL tags specify ts in milliseconds
6234          new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6235      // Add a cell that will expire in 20 seconds via family setting
6236      region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6237
6238      // Flush so we are sure store scanning gets this right
6239      region.flush(true);
6240
6241      // A query at time T+0 should return all cells
6242      Result r = region.get(new Get(row));
6243      assertNotNull(r.getValue(fam1, q1));
6244      assertNotNull(r.getValue(fam1, q2));
6245      assertNotNull(r.getValue(fam1, q3));
6246      assertNotNull(r.getValue(fam1, q4));
6247
6248      // Increment time to T+5 seconds
6249      edge.incrementTime(5000);
6250
6251      r = region.get(new Get(row));
6252      assertNull(r.getValue(fam1, q1));
6253      assertNotNull(r.getValue(fam1, q2));
6254      assertNotNull(r.getValue(fam1, q3));
6255      assertNotNull(r.getValue(fam1, q4));
6256
6257      // Increment time to T+10 seconds
6258      edge.incrementTime(5000);
6259
6260      r = region.get(new Get(row));
6261      assertNull(r.getValue(fam1, q1));
6262      assertNull(r.getValue(fam1, q2));
6263      assertNotNull(r.getValue(fam1, q3));
6264      assertNotNull(r.getValue(fam1, q4));
6265
6266      // Increment time to T+15 seconds
6267      edge.incrementTime(5000);
6268
6269      r = region.get(new Get(row));
6270      assertNull(r.getValue(fam1, q1));
6271      assertNull(r.getValue(fam1, q2));
6272      assertNull(r.getValue(fam1, q3));
6273      assertNotNull(r.getValue(fam1, q4));
6274
6275      // Increment time to T+20 seconds
6276      edge.incrementTime(10000);
6277
6278      r = region.get(new Get(row));
6279      assertNull(r.getValue(fam1, q1));
6280      assertNull(r.getValue(fam1, q2));
6281      assertNull(r.getValue(fam1, q3));
6282      assertNull(r.getValue(fam1, q4));
6283
6284      // Fun with disappearing increments
6285
6286      // Start at 1
6287      region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
6288      r = region.get(new Get(row));
6289      byte[] val = r.getValue(fam1, q1);
6290      assertNotNull(val);
6291      assertEquals(1L, Bytes.toLong(val));
6292
6293      // Increment with a TTL of 5 seconds
6294      Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6295      incr.setTTL(5000);
6296      region.increment(incr); // 2
6297
6298      // New value should be 2
6299      r = region.get(new Get(row));
6300      val = r.getValue(fam1, q1);
6301      assertNotNull(val);
6302      assertEquals(2L, Bytes.toLong(val));
6303
6304      // Increment time to T+25 seconds
6305      edge.incrementTime(5000);
6306
6307      // Value should be back to 1
6308      r = region.get(new Get(row));
6309      val = r.getValue(fam1, q1);
6310      assertNotNull(val);
6311      assertEquals(1L, Bytes.toLong(val));
6312
6313      // Increment time to T+30 seconds
6314      edge.incrementTime(5000);
6315
6316      // Original value written at T+20 should be gone now via family TTL
6317      r = region.get(new Get(row));
6318      assertNull(r.getValue(fam1, q1));
6319
6320    } finally {
6321      HBaseTestingUtility.closeRegionAndWAL(region);
6322    }
6323  }
6324
6325  @Test
6326  public void testIncrementTimestampsAreMonotonic() throws IOException {
6327    HRegion region = initHRegion(tableName, method, CONF, fam1);
6328    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6329    EnvironmentEdgeManager.injectEdge(edge);
6330
6331    edge.setValue(10);
6332    Increment inc = new Increment(row);
6333    inc.setDurability(Durability.SKIP_WAL);
6334    inc.addColumn(fam1, qual1, 1L);
6335    region.increment(inc);
6336
6337    Result result = region.get(new Get(row));
6338    Cell c = result.getColumnLatestCell(fam1, qual1);
6339    assertNotNull(c);
6340    assertEquals(10L, c.getTimestamp());
6341
6342    edge.setValue(1); // clock goes back
6343    region.increment(inc);
6344    result = region.get(new Get(row));
6345    c = result.getColumnLatestCell(fam1, qual1);
6346    assertEquals(11L, c.getTimestamp());
6347    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6348  }
6349
6350  @Test
6351  public void testAppendTimestampsAreMonotonic() throws IOException {
6352    HRegion region = initHRegion(tableName, method, CONF, fam1);
6353    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6354    EnvironmentEdgeManager.injectEdge(edge);
6355
6356    edge.setValue(10);
6357    Append a = new Append(row);
6358    a.setDurability(Durability.SKIP_WAL);
6359    a.addColumn(fam1, qual1, qual1);
6360    region.append(a);
6361
6362    Result result = region.get(new Get(row));
6363    Cell c = result.getColumnLatestCell(fam1, qual1);
6364    assertNotNull(c);
6365    assertEquals(10L, c.getTimestamp());
6366
6367    edge.setValue(1); // clock goes back
6368    region.append(a);
6369    result = region.get(new Get(row));
6370    c = result.getColumnLatestCell(fam1, qual1);
6371    assertEquals(11L, c.getTimestamp());
6372
6373    byte[] expected = new byte[qual1.length*2];
6374    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6375    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6376
6377    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6378      expected, 0, expected.length));
6379  }
6380
6381  @Test
6382  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6383    HRegion region = initHRegion(tableName, method, CONF, fam1);
6384    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6385    EnvironmentEdgeManager.injectEdge(edge);
6386
6387    edge.setValue(10);
6388    Put p = new Put(row);
6389    p.setDurability(Durability.SKIP_WAL);
6390    p.addColumn(fam1, qual1, qual1);
6391    region.put(p);
6392
6393    Result result = region.get(new Get(row));
6394    Cell c = result.getColumnLatestCell(fam1, qual1);
6395    assertNotNull(c);
6396    assertEquals(10L, c.getTimestamp());
6397
6398    edge.setValue(1); // clock goes back
6399    p = new Put(row);
6400    p.setDurability(Durability.SKIP_WAL);
6401    p.addColumn(fam1, qual1, qual2);
6402    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6403    result = region.get(new Get(row));
6404    c = result.getColumnLatestCell(fam1, qual1);
6405    assertEquals(10L, c.getTimestamp());
6406
6407    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6408      qual2, 0, qual2.length));
6409  }
6410
6411  @Test
6412  public void testBatchMutateWithWrongRegionException() throws Exception {
6413    final byte[] a = Bytes.toBytes("a");
6414    final byte[] b = Bytes.toBytes("b");
6415    final byte[] c = Bytes.toBytes("c"); // exclusive
6416
6417    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6418    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6419    final HRegion region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6420
6421    Mutation[] mutations = new Mutation[] {
6422        new Put(a)
6423            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6424              .setRow(a)
6425              .setFamily(fam1)
6426              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6427              .setType(Cell.Type.Put)
6428              .build()),
6429        // this is outside the region boundary
6430        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6431              .setRow(c)
6432              .setFamily(fam1)
6433              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6434              .setType(Type.Put)
6435              .build()),
6436        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6437              .setRow(b)
6438              .setFamily(fam1)
6439              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6440              .setType(Cell.Type.Put)
6441              .build())
6442    };
6443
6444    OperationStatus[] status = region.batchMutate(mutations);
6445    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6446    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6447    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6448
6449
6450    // test with a row lock held for a long time
6451    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6452    ExecutorService exec = Executors.newFixedThreadPool(2);
6453    Future<Void> f1 = exec.submit(new Callable<Void>() {
6454      @Override
6455      public Void call() throws Exception {
6456        LOG.info("Acquiring row lock");
6457        RowLock rl = region.getRowLock(b);
6458        obtainedRowLock.countDown();
6459        LOG.info("Waiting for 5 seconds before releasing lock");
6460        Threads.sleep(5000);
6461        LOG.info("Releasing row lock");
6462        rl.release();
6463        return null;
6464      }
6465    });
6466    obtainedRowLock.await(30, TimeUnit.SECONDS);
6467
6468    Future<Void> f2 = exec.submit(new Callable<Void>() {
6469      @Override
6470      public Void call() throws Exception {
6471        Mutation[] mutations = new Mutation[] {
6472            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6473                .setRow(a)
6474                .setFamily(fam1)
6475                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6476                .setType(Cell.Type.Put)
6477                .build()),
6478            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6479                .setRow(b)
6480                .setFamily(fam1)
6481                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6482                .setType(Cell.Type.Put)
6483                .build()),
6484        };
6485
6486        // this will wait for the row lock, and it will eventually succeed
6487        OperationStatus[] status = region.batchMutate(mutations);
6488        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6489        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6490        return null;
6491      }
6492    });
6493
6494    f1.get();
6495    f2.get();
6496
6497    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6498  }
6499
6500  @Test
6501  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6502    HRegion region = initHRegion(tableName, method, CONF, fam1);
6503    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6504    EnvironmentEdgeManager.injectEdge(edge);
6505
6506    edge.setValue(10);
6507    Put p = new Put(row);
6508    p.setDurability(Durability.SKIP_WAL);
6509    p.addColumn(fam1, qual1, qual1);
6510    region.put(p);
6511
6512    Result result = region.get(new Get(row));
6513    Cell c = result.getColumnLatestCell(fam1, qual1);
6514    assertNotNull(c);
6515    assertEquals(10L, c.getTimestamp());
6516
6517    edge.setValue(1); // clock goes back
6518    p = new Put(row);
6519    p.setDurability(Durability.SKIP_WAL);
6520    p.addColumn(fam1, qual1, qual2);
6521    RowMutations rm = new RowMutations(row);
6522    rm.add(p);
6523    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6524        new BinaryComparator(qual1), rm));
6525    result = region.get(new Get(row));
6526    c = result.getColumnLatestCell(fam1, qual1);
6527    assertEquals(10L, c.getTimestamp());
6528    LOG.info("c value " +
6529      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6530
6531    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6532      qual2, 0, qual2.length));
6533  }
6534
6535  HRegion initHRegion(TableName tableName, String callingMethod,
6536      byte[]... families) throws IOException {
6537    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6538        families);
6539  }
6540
6541  /**
6542   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6543   * @throws IOException if IO error occurred during test
6544   */
6545  @Test
6546  public void testWritesWhileRollWriter() throws IOException {
6547    int testCount = 10;
6548    int numRows = 1024;
6549    int numFamilies = 2;
6550    int numQualifiers = 2;
6551    final byte[][] families = new byte[numFamilies][];
6552    for (int i = 0; i < numFamilies; i++) {
6553      families[i] = Bytes.toBytes("family" + i);
6554    }
6555    final byte[][] qualifiers = new byte[numQualifiers][];
6556    for (int i = 0; i < numQualifiers; i++) {
6557      qualifiers[i] = Bytes.toBytes("qual" + i);
6558    }
6559
6560    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6561    this.region = initHRegion(tableName, method, CONF, families);
6562    try {
6563      List<Thread> threads = new ArrayList<>();
6564      for (int i = 0; i < numRows; i++) {
6565        final int count = i;
6566        Thread t = new Thread(new Runnable() {
6567
6568          @Override
6569          public void run() {
6570            byte[] row = Bytes.toBytes("row" + count);
6571            Put put = new Put(row);
6572            put.setDurability(Durability.SYNC_WAL);
6573            byte[] value = Bytes.toBytes(String.valueOf(count));
6574            for (byte[] family : families) {
6575              for (byte[] qualifier : qualifiers) {
6576                put.addColumn(family, qualifier, count, value);
6577              }
6578            }
6579            try {
6580              region.put(put);
6581            } catch (IOException e) {
6582              throw new RuntimeException(e);
6583            }
6584          }
6585        });
6586        threads.add(t);
6587      }
6588      for (Thread t : threads) {
6589        t.start();
6590      }
6591
6592      for (int i = 0; i < testCount; i++) {
6593        region.getWAL().rollWriter();
6594        Thread.yield();
6595      }
6596    } finally {
6597      try {
6598        HBaseTestingUtility.closeRegionAndWAL(this.region);
6599        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6600      } catch (DroppedSnapshotException dse) {
6601        // We could get this on way out because we interrupt the background flusher and it could
6602        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6603        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6604        // flush because the accounting is 'off' since original DSE.
6605      }
6606      this.region = null;
6607    }
6608  }
6609
6610  @Test
6611  public void testMutateRow_WriteRequestCount() throws Exception {
6612    byte[] row1 = Bytes.toBytes("row1");
6613    byte[] fam1 = Bytes.toBytes("fam1");
6614    byte[] qf1 = Bytes.toBytes("qualifier");
6615    byte[] val1 = Bytes.toBytes("value1");
6616
6617    RowMutations rm = new RowMutations(row1);
6618    Put put = new Put(row1);
6619    put.addColumn(fam1, qf1, val1);
6620    rm.add(put);
6621
6622    this.region = initHRegion(tableName, method, CONF, fam1);
6623    try {
6624      long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6625      this.region.mutateRow(rm);
6626      long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6627      Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6628    } finally {
6629      HBaseTestingUtility.closeRegionAndWAL(this.region);
6630      this.region = null;
6631    }
6632  }
6633
6634  @Test
6635  public void testBulkLoadReplicationEnabled() throws IOException {
6636    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6637    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6638    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6639
6640    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6641    htd.addFamily(new HColumnDescriptor(fam1));
6642    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6643        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6644    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6645        rss, null);
6646
6647    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6648    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6649    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6650    assertTrue(plugins.contains(replicationCoprocessorClass));
6651    assertTrue(region.getCoprocessorHost().
6652        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6653
6654    region.close();
6655  }
6656
6657  /**
6658   * The same as HRegion class, the only difference is that instantiateHStore will
6659   * create a different HStore - HStoreForTesting. [HBASE-8518]
6660   */
6661  public static class HRegionForTesting extends HRegion {
6662
6663    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6664                             final Configuration confParam, final RegionInfo regionInfo,
6665                             final TableDescriptor htd, final RegionServerServices rsServices) {
6666      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6667          wal, confParam, htd, rsServices);
6668    }
6669
6670    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6671                             Configuration confParam, TableDescriptor htd,
6672                             RegionServerServices rsServices) {
6673      super(fs, wal, confParam, htd, rsServices);
6674    }
6675
6676    /**
6677     * Create HStore instance.
6678     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6679     */
6680    @Override
6681    protected HStore instantiateHStore(final ColumnFamilyDescriptor family) throws IOException {
6682      if (family.isMobEnabled()) {
6683        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6684          throw new IOException("A minimum HFile version of "
6685              + HFile.MIN_FORMAT_VERSION_WITH_TAGS
6686              + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
6687              + " accordingly.");
6688        }
6689        return new HMobStore(this, family, this.conf);
6690      }
6691      return new HStoreForTesting(this, family, this.conf);
6692    }
6693  }
6694
6695  /**
6696   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6697   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6698   * doesn't let hstore compaction complete. In the former edition, this config is set in
6699   * HStore class inside compact method, though this is just for testing, otherwise it
6700   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6701   * config (except for testing code).
6702   */
6703  public static class HStoreForTesting extends HStore {
6704
6705    protected HStoreForTesting(final HRegion region,
6706        final ColumnFamilyDescriptor family,
6707        final Configuration confParam) throws IOException {
6708      super(region, family, confParam);
6709    }
6710
6711    @Override
6712    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6713        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6714        List<Path> newFiles) throws IOException {
6715      // let compaction incomplete.
6716      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6717        LOG.warn("hbase.hstore.compaction.complete is set to false");
6718        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6719        final boolean evictOnClose =
6720            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6721        for (Path newFile : newFiles) {
6722          // Create storefile around what we wrote with a reader on it.
6723          HStoreFile sf = createStoreFileAndReader(newFile);
6724          sf.closeStoreFile(evictOnClose);
6725          sfs.add(sf);
6726        }
6727        return sfs;
6728      }
6729      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6730    }
6731  }
6732}