001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyLong;
033import static org.mockito.Mockito.doThrow;
034import static org.mockito.Mockito.mock;
035import static org.mockito.Mockito.never;
036import static org.mockito.Mockito.spy;
037import static org.mockito.Mockito.times;
038import static org.mockito.Mockito.verify;
039import static org.mockito.Mockito.when;
040
041import java.io.IOException;
042import java.io.InterruptedIOException;
043import java.math.BigDecimal;
044import java.nio.charset.StandardCharsets;
045import java.security.PrivilegedExceptionAction;
046import java.util.ArrayList;
047import java.util.Arrays;
048import java.util.Collection;
049import java.util.List;
050import java.util.Map;
051import java.util.NavigableMap;
052import java.util.Objects;
053import java.util.TreeMap;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.atomic.AtomicBoolean;
061import java.util.concurrent.atomic.AtomicInteger;
062import java.util.concurrent.atomic.AtomicReference;
063import org.apache.commons.lang3.RandomStringUtils;
064import org.apache.hadoop.conf.Configuration;
065import org.apache.hadoop.fs.FSDataOutputStream;
066import org.apache.hadoop.fs.FileStatus;
067import org.apache.hadoop.fs.FileSystem;
068import org.apache.hadoop.fs.Path;
069import org.apache.hadoop.hbase.ArrayBackedTag;
070import org.apache.hadoop.hbase.Cell;
071import org.apache.hadoop.hbase.Cell.Type;
072import org.apache.hadoop.hbase.CellBuilderFactory;
073import org.apache.hadoop.hbase.CellBuilderType;
074import org.apache.hadoop.hbase.CellUtil;
075import org.apache.hadoop.hbase.CompareOperator;
076import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
077import org.apache.hadoop.hbase.DoNotRetryIOException;
078import org.apache.hadoop.hbase.DroppedSnapshotException;
079import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
080import org.apache.hadoop.hbase.HBaseClassTestRule;
081import org.apache.hadoop.hbase.HBaseConfiguration;
082import org.apache.hadoop.hbase.HBaseTestingUtility;
083import org.apache.hadoop.hbase.HConstants;
084import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
085import org.apache.hadoop.hbase.HDFSBlocksDistribution;
086import org.apache.hadoop.hbase.KeyValue;
087import org.apache.hadoop.hbase.MiniHBaseCluster;
088import org.apache.hadoop.hbase.MultithreadedTestUtil;
089import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
090import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
091import org.apache.hadoop.hbase.NotServingRegionException;
092import org.apache.hadoop.hbase.PrivateCellUtil;
093import org.apache.hadoop.hbase.RegionTooBusyException;
094import org.apache.hadoop.hbase.ServerName;
095import org.apache.hadoop.hbase.StartMiniClusterOption;
096import org.apache.hadoop.hbase.TableName;
097import org.apache.hadoop.hbase.TagType;
098import org.apache.hadoop.hbase.Waiter;
099import org.apache.hadoop.hbase.client.Append;
100import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
101import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
102import org.apache.hadoop.hbase.client.Delete;
103import org.apache.hadoop.hbase.client.Durability;
104import org.apache.hadoop.hbase.client.Get;
105import org.apache.hadoop.hbase.client.Increment;
106import org.apache.hadoop.hbase.client.Mutation;
107import org.apache.hadoop.hbase.client.Put;
108import org.apache.hadoop.hbase.client.RegionInfo;
109import org.apache.hadoop.hbase.client.RegionInfoBuilder;
110import org.apache.hadoop.hbase.client.Result;
111import org.apache.hadoop.hbase.client.RowMutations;
112import org.apache.hadoop.hbase.client.Scan;
113import org.apache.hadoop.hbase.client.Table;
114import org.apache.hadoop.hbase.client.TableDescriptor;
115import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
116import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
117import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
118import org.apache.hadoop.hbase.filter.BigDecimalComparator;
119import org.apache.hadoop.hbase.filter.BinaryComparator;
120import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
121import org.apache.hadoop.hbase.filter.Filter;
122import org.apache.hadoop.hbase.filter.FilterBase;
123import org.apache.hadoop.hbase.filter.FilterList;
124import org.apache.hadoop.hbase.filter.NullComparator;
125import org.apache.hadoop.hbase.filter.PrefixFilter;
126import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
127import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
128import org.apache.hadoop.hbase.filter.SubstringComparator;
129import org.apache.hadoop.hbase.filter.ValueFilter;
130import org.apache.hadoop.hbase.io.TimeRange;
131import org.apache.hadoop.hbase.io.hfile.HFile;
132import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
133import org.apache.hadoop.hbase.monitoring.MonitoredTask;
134import org.apache.hadoop.hbase.monitoring.TaskMonitor;
135import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
136import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
137import org.apache.hadoop.hbase.regionserver.Region.RowLock;
138import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
139import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
140import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
141import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
142import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
143import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
144import org.apache.hadoop.hbase.security.User;
145import org.apache.hadoop.hbase.test.MetricsAssertHelper;
146import org.apache.hadoop.hbase.testclassification.LargeTests;
147import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
148import org.apache.hadoop.hbase.util.Bytes;
149import org.apache.hadoop.hbase.util.CommonFSUtils;
150import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
151import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
152import org.apache.hadoop.hbase.util.HFileArchiveUtil;
153import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
154import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
155import org.apache.hadoop.hbase.util.Threads;
156import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
157import org.apache.hadoop.hbase.wal.FaultyFSLog;
158import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
159import org.apache.hadoop.hbase.wal.WAL;
160import org.apache.hadoop.hbase.wal.WALEdit;
161import org.apache.hadoop.hbase.wal.WALFactory;
162import org.apache.hadoop.hbase.wal.WALKeyImpl;
163import org.apache.hadoop.hbase.wal.WALProvider;
164import org.apache.hadoop.hbase.wal.WALProvider.Writer;
165import org.apache.hadoop.hbase.wal.WALSplitUtil;
166import org.junit.After;
167import org.junit.Assert;
168import org.junit.Before;
169import org.junit.ClassRule;
170import org.junit.Rule;
171import org.junit.Test;
172import org.junit.experimental.categories.Category;
173import org.junit.rules.ExpectedException;
174import org.junit.rules.TestName;
175import org.mockito.ArgumentCaptor;
176import org.mockito.ArgumentMatcher;
177import org.mockito.Mockito;
178import org.mockito.invocation.InvocationOnMock;
179import org.mockito.stubbing.Answer;
180import org.slf4j.Logger;
181import org.slf4j.LoggerFactory;
182
183import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
184import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
185import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
186import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
187import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
188
189import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
190import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
191import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
192import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
193import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
196
197/**
198 * Basic stand-alone testing of HRegion.  No clusters!
199 *
200 * A lot of the meta information for an HRegion now lives inside other HRegions
201 * or in the HBaseMaster, so only basic testing is possible.
202 */
203@Category({VerySlowRegionServerTests.class, LargeTests.class})
204@SuppressWarnings("deprecation")
205public class TestHRegion {
206
207  @ClassRule
208  public static final HBaseClassTestRule CLASS_RULE =
209      HBaseClassTestRule.forClass(TestHRegion.class);
210
211  // Do not spin up clusters in here. If you need to spin up a cluster, do it
212  // over in TestHRegionOnCluster.
213  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
214  @Rule
215  public TestName name = new TestName();
216  @Rule public final ExpectedException thrown = ExpectedException.none();
217
218  private static final String COLUMN_FAMILY = "MyCF";
219  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
220  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
221
222  HRegion region = null;
223  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
224  protected static HBaseTestingUtility TEST_UTIL;
225  public static Configuration CONF ;
226  private String dir;
227  private final int MAX_VERSIONS = 2;
228
229  // Test names
230  protected TableName tableName;
231  protected String method;
232  protected final byte[] qual = Bytes.toBytes("qual");
233  protected final byte[] qual1 = Bytes.toBytes("qual1");
234  protected final byte[] qual2 = Bytes.toBytes("qual2");
235  protected final byte[] qual3 = Bytes.toBytes("qual3");
236  protected final byte[] value = Bytes.toBytes("value");
237  protected final byte[] value1 = Bytes.toBytes("value1");
238  protected final byte[] value2 = Bytes.toBytes("value2");
239  protected final byte[] row = Bytes.toBytes("rowA");
240  protected final byte[] row2 = Bytes.toBytes("rowB");
241
242  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
243      .getInstance(MetricsAssertHelper.class);
244
245  @Before
246  public void setup() throws IOException {
247    TEST_UTIL = new HBaseTestingUtility();
248    CONF = TEST_UTIL.getConfiguration();
249    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
250    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
251    method = name.getMethodName();
252    tableName = TableName.valueOf(method);
253    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
254  }
255
256  @After
257  public void tearDown() throws IOException {
258    // Region may have been closed, but it is still no harm if we close it again here using HTU.
259    HBaseTestingUtility.closeRegionAndWAL(region);
260    EnvironmentEdgeManagerTestHelper.reset();
261    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
262    TEST_UTIL.cleanupTestDir();
263  }
264
265  /**
266   * Test that I can use the max flushed sequence id after the close.
267   * @throws IOException
268   */
269  @Test
270  public void testSequenceId() throws IOException {
271    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
272    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
273    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
274    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
275    HBaseTestingUtility.closeRegionAndWAL(this.region);
276    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
277    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
278    // Open region again.
279    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
280    byte [] value = Bytes.toBytes(method);
281    // Make a random put against our cf.
282    Put put = new Put(value);
283    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
284    region.put(put);
285    // No flush yet so init numbers should still be in place.
286    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
287    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
288    region.flush(true);
289    long max = region.getMaxFlushedSeqId();
290    HBaseTestingUtility.closeRegionAndWAL(this.region);
291    assertEquals(max, region.getMaxFlushedSeqId());
292    this.region = null;
293  }
294
295  /**
296   * Test for Bug 2 of HBASE-10466.
297   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
298   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
299   * flush is skipped and only the second flush takes place. However, two flushes are required in
300   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
301   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
302   * flushes for region close."
303   * @throws IOException
304   */
305  @Test
306  public void testCloseCarryingSnapshot() throws IOException {
307    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
308    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
309    // Get some random bytes.
310    byte [] value = Bytes.toBytes(method);
311    // Make a random put against our cf.
312    Put put = new Put(value);
313    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
314    // First put something in current memstore, which will be in snapshot after flusher.prepare()
315    region.put(put);
316    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
317    storeFlushCtx.prepare();
318    // Second put something in current memstore
319    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
320    region.put(put);
321    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
322    HBaseTestingUtility.closeRegionAndWAL(region);
323    assertEquals(0, region.getMemStoreDataSize());
324    region = null;
325  }
326
327  /*
328   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
329   * See HBASE-10845
330   */
331  @Test
332  public void testMemstoreSnapshotSize() throws IOException {
333    class MyFaultyFSLog extends FaultyFSLog {
334      StoreFlushContext storeFlushCtx;
335      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
336          throws IOException {
337        super(fs, rootDir, logName, conf);
338      }
339
340      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
341        this.storeFlushCtx = storeFlushCtx;
342      }
343
344      @Override
345      public void sync(long txid) throws IOException {
346        storeFlushCtx.prepare();
347        super.sync(txid);
348      }
349    }
350
351    FileSystem fs = FileSystem.get(CONF);
352    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
353    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
354    faultyLog.init();
355    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
356        COLUMN_FAMILY_BYTES);
357
358    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
359    // Get some random bytes.
360    byte [] value = Bytes.toBytes(method);
361    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
362
363    Put put = new Put(value);
364    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
365    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
366    boolean threwIOE = false;
367    try {
368      region.put(put);
369    } catch (IOException ioe) {
370      threwIOE = true;
371    } finally {
372      assertTrue("The regionserver should have thrown an exception", threwIOE);
373    }
374    MemStoreSize mss = store.getFlushableSize();
375    assertTrue("flushable size should be zero, but it is " + mss,
376        mss.getDataSize() == 0);
377  }
378
379  /**
380   * Create a WAL outside of the usual helper in
381   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
382   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
383   * {@code fs.file.impl}.
384   * @param callingMethod a unique component for the path, probably the name of the test method.
385   */
386  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
387      Configuration conf, TableName tableName) throws IOException {
388    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
389    final Configuration walConf = new Configuration(conf);
390    CommonFSUtils.setRootDir(walConf, logDir);
391    return new WALFactory(walConf, callingMethod)
392        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
393  }
394
395  @Test
396  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
397    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
398    FileSystem fs = FileSystem.get(CONF);
399    Path rootDir = new Path(dir + testName);
400    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
401    hLog.init();
402    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
403        COLUMN_FAMILY_BYTES);
404    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
405    assertEquals(0, region.getMemStoreDataSize());
406
407    // Put one value
408    byte [] value = Bytes.toBytes(method);
409    Put put = new Put(value);
410    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
411    region.put(put);
412    long onePutSize = region.getMemStoreDataSize();
413    assertTrue(onePutSize > 0);
414
415    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
416    doThrow(new IOException())
417       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
418    region.setCoprocessorHost(mockedCPHost);
419
420    put = new Put(value);
421    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
422    try {
423      region.put(put);
424      fail("Should have failed with IOException");
425    } catch (IOException expected) {
426    }
427    long expectedSize = onePutSize * 2;
428    assertEquals("memstoreSize should be incremented",
429        expectedSize, region.getMemStoreDataSize());
430    assertEquals("flushable size should be incremented",
431        expectedSize, store.getFlushableSize().getDataSize());
432
433    region.setCoprocessorHost(null);
434  }
435
436  /**
437   * A test case of HBASE-21041
438   * @throws Exception Exception
439   */
440  @Test
441  public void testFlushAndMemstoreSizeCounting() throws Exception {
442    byte[] family = Bytes.toBytes("family");
443    this.region = initHRegion(tableName, method, CONF, family);
444    final WALFactory wals = new WALFactory(CONF, method);
445    try {
446      for (byte[] row : HBaseTestingUtility.ROWS) {
447        Put put = new Put(row);
448        put.addColumn(family, family, row);
449        region.put(put);
450      }
451      region.flush(true);
452      // After flush, data size should be zero
453      assertEquals(0, region.getMemStoreDataSize());
454      // After flush, a new active mutable segment is created, so the heap size
455      // should equal to MutableSegment.DEEP_OVERHEAD
456      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
457      // After flush, offheap should be zero
458      assertEquals(0, region.getMemStoreOffHeapSize());
459    } finally {
460      HBaseTestingUtility.closeRegionAndWAL(this.region);
461      this.region = null;
462      wals.close();
463    }
464  }
465
466  /**
467   * Test we do not lose data if we fail a flush and then close.
468   * Part of HBase-10466.  Tests the following from the issue description:
469   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
470   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
471   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
472   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
473   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
474   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
475   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
476   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
477   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
478   * if memstoreSize is not larger than 0."
479   * @throws Exception
480   */
481  @Test
482  public void testFlushSizeAccounting() throws Exception {
483    final Configuration conf = HBaseConfiguration.create(CONF);
484    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
485    // Only retry once.
486    conf.setInt("hbase.hstore.flush.retries.number", 1);
487    final User user =
488      User.createUserForTesting(conf, method, new String[]{"foo"});
489    // Inject our faulty LocalFileSystem
490    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
491    user.runAs(new PrivilegedExceptionAction<Object>() {
492      @Override
493      public Object run() throws Exception {
494        // Make sure it worked (above is sensitive to caching details in hadoop core)
495        FileSystem fs = FileSystem.get(conf);
496        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
497        FaultyFileSystem ffs = (FaultyFileSystem)fs;
498        HRegion region = null;
499        try {
500          // Initialize region
501          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
502              COLUMN_FAMILY_BYTES);
503          long size = region.getMemStoreDataSize();
504          Assert.assertEquals(0, size);
505          // Put one item into memstore.  Measure the size of one item in memstore.
506          Put p1 = new Put(row);
507          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
508          region.put(p1);
509          final long sizeOfOnePut = region.getMemStoreDataSize();
510          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
511          try {
512            LOG.info("Flushing");
513            region.flush(true);
514            Assert.fail("Didn't bubble up IOE!");
515          } catch (DroppedSnapshotException dse) {
516            // What we are expecting
517            region.closing.set(false); // this is needed for the rest of the test to work
518          }
519          // Make it so all writes succeed from here on out
520          ffs.fault.set(false);
521          // Check sizes.  Should still be the one entry.
522          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
523          // Now add two entries so that on this next flush that fails, we can see if we
524          // subtract the right amount, the snapshot size only.
525          Put p2 = new Put(row);
526          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
527          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
528          region.put(p2);
529          long expectedSize = sizeOfOnePut * 3;
530          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
531          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
532          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
533          // it
534          region.flush(true);
535          // Make sure our memory accounting is right.
536          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
537        } finally {
538          HBaseTestingUtility.closeRegionAndWAL(region);
539        }
540        return null;
541      }
542    });
543    FileSystem.closeAllForUGI(user.getUGI());
544  }
545
546  @Test
547  public void testCloseWithFailingFlush() throws Exception {
548    final Configuration conf = HBaseConfiguration.create(CONF);
549    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
550    // Only retry once.
551    conf.setInt("hbase.hstore.flush.retries.number", 1);
552    final User user =
553      User.createUserForTesting(conf, this.method, new String[]{"foo"});
554    // Inject our faulty LocalFileSystem
555    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
556    user.runAs(new PrivilegedExceptionAction<Object>() {
557      @Override
558      public Object run() throws Exception {
559        // Make sure it worked (above is sensitive to caching details in hadoop core)
560        FileSystem fs = FileSystem.get(conf);
561        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
562        FaultyFileSystem ffs = (FaultyFileSystem)fs;
563        HRegion region = null;
564        try {
565          // Initialize region
566          region = initHRegion(tableName, null, null, false,
567              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
568          long size = region.getMemStoreDataSize();
569          Assert.assertEquals(0, size);
570          // Put one item into memstore.  Measure the size of one item in memstore.
571          Put p1 = new Put(row);
572          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
573          region.put(p1);
574          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
575          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
576          StoreFlushContext storeFlushCtx =
577              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
578          storeFlushCtx.prepare();
579          // Now add two entries to the foreground memstore.
580          Put p2 = new Put(row);
581          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
582          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
583          region.put(p2);
584          // Now try close on top of a failing flush.
585          HBaseTestingUtility.closeRegionAndWAL(region);
586          region = null;
587          fail();
588        } catch (DroppedSnapshotException dse) {
589          // Expected
590          LOG.info("Expected DroppedSnapshotException");
591        } finally {
592          // Make it so all writes succeed from here on out so can close clean
593          ffs.fault.set(false);
594          HBaseTestingUtility.closeRegionAndWAL(region);
595        }
596        return null;
597      }
598    });
599    FileSystem.closeAllForUGI(user.getUGI());
600  }
601
602  @Test
603  public void testCompactionAffectedByScanners() throws Exception {
604    byte[] family = Bytes.toBytes("family");
605    this.region = initHRegion(tableName, method, CONF, family);
606
607    Put put = new Put(Bytes.toBytes("r1"));
608    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
609    region.put(put);
610    region.flush(true);
611
612    Scan scan = new Scan();
613    scan.readVersions(3);
614    // open the first scanner
615    RegionScanner scanner1 = region.getScanner(scan);
616
617    Delete delete = new Delete(Bytes.toBytes("r1"));
618    region.delete(delete);
619    region.flush(true);
620
621    // open the second scanner
622    RegionScanner scanner2 = region.getScanner(scan);
623
624    List<Cell> results = new ArrayList<>();
625
626    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
627
628    // make a major compaction
629    region.compact(true);
630
631    // open the third scanner
632    RegionScanner scanner3 = region.getScanner(scan);
633
634    // get data from scanner 1, 2, 3 after major compaction
635    scanner1.next(results);
636    System.out.println(results);
637    assertEquals(1, results.size());
638
639    results.clear();
640    scanner2.next(results);
641    System.out.println(results);
642    assertEquals(0, results.size());
643
644    results.clear();
645    scanner3.next(results);
646    System.out.println(results);
647    assertEquals(0, results.size());
648  }
649
650  @Test
651  public void testToShowNPEOnRegionScannerReseek() throws Exception {
652    byte[] family = Bytes.toBytes("family");
653    this.region = initHRegion(tableName, method, CONF, family);
654
655    Put put = new Put(Bytes.toBytes("r1"));
656    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
657    region.put(put);
658    put = new Put(Bytes.toBytes("r2"));
659    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
660    region.put(put);
661    region.flush(true);
662
663    Scan scan = new Scan();
664    scan.readVersions(3);
665    // open the first scanner
666    RegionScanner scanner1 = region.getScanner(scan);
667
668    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
669
670    region.compact(true);
671
672    scanner1.reseek(Bytes.toBytes("r2"));
673    List<Cell> results = new ArrayList<>();
674    scanner1.next(results);
675    Cell keyValue = results.get(0);
676    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
677    scanner1.close();
678  }
679
680  @Test
681  public void testArchiveRecoveredEditsReplay() throws Exception {
682    byte[] family = Bytes.toBytes("family");
683    this.region = initHRegion(tableName, method, CONF, family);
684    final WALFactory wals = new WALFactory(CONF, method);
685    try {
686      Path regiondir = region.getRegionFileSystem().getRegionDir();
687      FileSystem fs = region.getRegionFileSystem().getFileSystem();
688      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
689
690      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
691
692      long maxSeqId = 1050;
693      long minSeqId = 1000;
694
695      for (long i = minSeqId; i <= maxSeqId; i += 10) {
696        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
697        fs.create(recoveredEdits);
698        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
699
700        long time = System.nanoTime();
701        WALEdit edit = new WALEdit();
702        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
703          .toBytes(i)));
704        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
705          HConstants.DEFAULT_CLUSTER_ID), edit));
706
707        writer.close();
708      }
709      MonitoredTask status = TaskMonitor.get().createStatus(method);
710      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
711      for (HStore store : region.getStores()) {
712        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
713      }
714      CONF.set("hbase.region.archive.recovered.edits", "true");
715      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "/custom_wal_dir");
716      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
717      assertEquals(maxSeqId, seqId);
718      region.getMVCC().advanceTo(seqId);
719      String fakeFamilyName = recoveredEditsDir.getName();
720      Path rootDir = new Path(CONF.get(HConstants.HBASE_DIR));
721      Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePathForRootDir(rootDir,
722        region.getRegionInfo(), Bytes.toBytes(fakeFamilyName));
723      FileStatus[] list = TEST_UTIL.getTestFileSystem().listStatus(storeArchiveDir);
724      assertEquals(6, list.length);
725    } finally {
726      CONF.set("hbase.region.archive.recovered.edits", "false");
727      CONF.set(CommonFSUtils.HBASE_WAL_DIR, "");
728      HBaseTestingUtility.closeRegionAndWAL(this.region);
729      this.region = null;
730      wals.close();
731    }
732  }
733
734  @Test
735  public void testSkipRecoveredEditsReplay() throws Exception {
736    byte[] family = Bytes.toBytes("family");
737    this.region = initHRegion(tableName, method, CONF, family);
738    final WALFactory wals = new WALFactory(CONF, method);
739    try {
740      Path regiondir = region.getRegionFileSystem().getRegionDir();
741      FileSystem fs = region.getRegionFileSystem().getFileSystem();
742      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
743
744      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
745
746      long maxSeqId = 1050;
747      long minSeqId = 1000;
748
749      for (long i = minSeqId; i <= maxSeqId; i += 10) {
750        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
751        fs.create(recoveredEdits);
752        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
753
754        long time = System.nanoTime();
755        WALEdit edit = new WALEdit();
756        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
757            .toBytes(i)));
758        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
759            HConstants.DEFAULT_CLUSTER_ID), edit));
760
761        writer.close();
762      }
763      MonitoredTask status = TaskMonitor.get().createStatus(method);
764      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
765      for (HStore store : region.getStores()) {
766        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
767      }
768      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
769      assertEquals(maxSeqId, seqId);
770      region.getMVCC().advanceTo(seqId);
771      Get get = new Get(row);
772      Result result = region.get(get);
773      for (long i = minSeqId; i <= maxSeqId; i += 10) {
774        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
775        assertEquals(1, kvs.size());
776        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
777      }
778    } finally {
779      HBaseTestingUtility.closeRegionAndWAL(this.region);
780      this.region = null;
781      wals.close();
782    }
783  }
784
785  @Test
786  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
787    byte[] family = Bytes.toBytes("family");
788    this.region = initHRegion(tableName, method, CONF, family);
789    final WALFactory wals = new WALFactory(CONF, method);
790    try {
791      Path regiondir = region.getRegionFileSystem().getRegionDir();
792      FileSystem fs = region.getRegionFileSystem().getFileSystem();
793      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
794
795      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
796
797      long maxSeqId = 1050;
798      long minSeqId = 1000;
799
800      for (long i = minSeqId; i <= maxSeqId; i += 10) {
801        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
802        fs.create(recoveredEdits);
803        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
804
805        long time = System.nanoTime();
806        WALEdit edit = new WALEdit();
807        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
808            .toBytes(i)));
809        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
810            HConstants.DEFAULT_CLUSTER_ID), edit));
811
812        writer.close();
813      }
814      long recoverSeqId = 1030;
815      MonitoredTask status = TaskMonitor.get().createStatus(method);
816      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
817      for (HStore store : region.getStores()) {
818        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
819      }
820      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
821      assertEquals(maxSeqId, seqId);
822      region.getMVCC().advanceTo(seqId);
823      Get get = new Get(row);
824      Result result = region.get(get);
825      for (long i = minSeqId; i <= maxSeqId; i += 10) {
826        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
827        if (i < recoverSeqId) {
828          assertEquals(0, kvs.size());
829        } else {
830          assertEquals(1, kvs.size());
831          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
832        }
833      }
834    } finally {
835      HBaseTestingUtility.closeRegionAndWAL(this.region);
836      this.region = null;
837      wals.close();
838    }
839  }
840
841  @Test
842  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
843    byte[] family = Bytes.toBytes("family");
844    this.region = initHRegion(tableName, method, CONF, family);
845    Path regiondir = region.getRegionFileSystem().getRegionDir();
846    FileSystem fs = region.getRegionFileSystem().getFileSystem();
847
848    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
849    for (int i = 1000; i < 1050; i += 10) {
850      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
851      FSDataOutputStream dos = fs.create(recoveredEdits);
852      dos.writeInt(i);
853      dos.close();
854    }
855    long minSeqId = 2000;
856    Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
857    FSDataOutputStream dos = fs.create(recoveredEdits);
858    dos.close();
859
860    Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
861    for (HStore store : region.getStores()) {
862      maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
863    }
864    long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
865    assertEquals(minSeqId, seqId);
866  }
867
868  @Test
869  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
870    byte[] family = Bytes.toBytes("family");
871    this.region = initHRegion(tableName, method, CONF, family);
872    final WALFactory wals = new WALFactory(CONF, method);
873    try {
874      Path regiondir = region.getRegionFileSystem().getRegionDir();
875      FileSystem fs = region.getRegionFileSystem().getFileSystem();
876      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
877      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
878
879      assertEquals(0, region.getStoreFileList(columns).size());
880
881      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
882
883      long maxSeqId = 1050;
884      long minSeqId = 1000;
885
886      for (long i = minSeqId; i <= maxSeqId; i += 10) {
887        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
888        fs.create(recoveredEdits);
889        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
890
891        long time = System.nanoTime();
892        WALEdit edit = null;
893        if (i == maxSeqId) {
894          edit = WALEdit.createCompaction(region.getRegionInfo(),
895          CompactionDescriptor.newBuilder()
896          .setTableName(ByteString.copyFrom(tableName.getName()))
897          .setFamilyName(ByteString.copyFrom(regionName))
898          .setEncodedRegionName(ByteString.copyFrom(regionName))
899          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
900          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
901          .build());
902        } else {
903          edit = new WALEdit();
904          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
905            .toBytes(i)));
906        }
907        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
908            HConstants.DEFAULT_CLUSTER_ID), edit));
909        writer.close();
910      }
911
912      long recoverSeqId = 1030;
913      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
914      MonitoredTask status = TaskMonitor.get().createStatus(method);
915      for (HStore store : region.getStores()) {
916        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
917      }
918      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
919      assertEquals(maxSeqId, seqId);
920
921      // assert that the files are flushed
922      assertEquals(1, region.getStoreFileList(columns).size());
923
924    } finally {
925      HBaseTestingUtility.closeRegionAndWAL(this.region);
926      this.region = null;
927      wals.close();
928    }
929  }
930
931  @Test
932  public void testRecoveredEditsReplayCompaction() throws Exception {
933    testRecoveredEditsReplayCompaction(false);
934    testRecoveredEditsReplayCompaction(true);
935  }
936
937  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
938    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
939    byte[] family = Bytes.toBytes("family");
940    this.region = initHRegion(tableName, method, CONF, family);
941    final WALFactory wals = new WALFactory(CONF, method);
942    try {
943      Path regiondir = region.getRegionFileSystem().getRegionDir();
944      FileSystem fs = region.getRegionFileSystem().getFileSystem();
945      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
946
947      long maxSeqId = 3;
948      long minSeqId = 0;
949
950      for (long i = minSeqId; i < maxSeqId; i++) {
951        Put put = new Put(Bytes.toBytes(i));
952        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
953        region.put(put);
954        region.flush(true);
955      }
956
957      // this will create a region with 3 files
958      assertEquals(3, region.getStore(family).getStorefilesCount());
959      List<Path> storeFiles = new ArrayList<>(3);
960      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
961        storeFiles.add(sf.getPath());
962      }
963
964      // disable compaction completion
965      CONF.setBoolean("hbase.hstore.compaction.complete", false);
966      region.compactStores();
967
968      // ensure that nothing changed
969      assertEquals(3, region.getStore(family).getStorefilesCount());
970
971      // now find the compacted file, and manually add it to the recovered edits
972      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
973      FileStatus[] files = CommonFSUtils.listStatus(fs, tmpDir);
974      String errorMsg = "Expected to find 1 file in the region temp directory "
975          + "from the compaction, could not find any";
976      assertNotNull(errorMsg, files);
977      assertEquals(errorMsg, 1, files.length);
978      // move the file inside region dir
979      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
980          files[0].getPath());
981
982      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
983      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
984      for (int i=0; i < encodedNameAsBytes.length; i++) {
985        // Mix the byte array to have a new encodedName
986        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
987      }
988
989      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
990        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
991            storeFiles, Lists.newArrayList(newFile),
992            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
993
994      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
995          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
996
997      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
998
999      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1000      fs.create(recoveredEdits);
1001      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1002
1003      long time = System.nanoTime();
1004
1005      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
1006          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
1007          compactionDescriptor)));
1008      writer.close();
1009
1010      // close the region now, and reopen again
1011      region.getTableDescriptor();
1012      region.getRegionInfo();
1013      HBaseTestingUtility.closeRegionAndWAL(this.region);
1014      try {
1015        region = HRegion.openHRegion(region, null);
1016      } catch (WrongRegionException wre) {
1017        fail("Matching encoded region name should not have produced WrongRegionException");
1018      }
1019
1020      // now check whether we have only one store file, the compacted one
1021      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
1022      for (HStoreFile sf : sfs) {
1023        LOG.info(Objects.toString(sf.getPath()));
1024      }
1025      if (!mismatchedRegionName) {
1026        assertEquals(1, region.getStore(family).getStorefilesCount());
1027      }
1028      files = CommonFSUtils.listStatus(fs, tmpDir);
1029      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
1030
1031      for (long i = minSeqId; i < maxSeqId; i++) {
1032        Get get = new Get(Bytes.toBytes(i));
1033        Result result = region.get(get);
1034        byte[] value = result.getValue(family, Bytes.toBytes(i));
1035        assertArrayEquals(Bytes.toBytes(i), value);
1036      }
1037    } finally {
1038      HBaseTestingUtility.closeRegionAndWAL(this.region);
1039      this.region = null;
1040      wals.close();
1041      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
1042    }
1043  }
1044
1045  @Test
1046  public void testFlushMarkers() throws Exception {
1047    // tests that flush markers are written to WAL and handled at recovered edits
1048    byte[] family = Bytes.toBytes("family");
1049    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1050    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1051    CommonFSUtils.setRootDir(walConf, logDir);
1052    final WALFactory wals = new WALFactory(walConf, method);
1053    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1054
1055    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1056      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1057    try {
1058      Path regiondir = region.getRegionFileSystem().getRegionDir();
1059      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1060      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1061
1062      long maxSeqId = 3;
1063      long minSeqId = 0;
1064
1065      for (long i = minSeqId; i < maxSeqId; i++) {
1066        Put put = new Put(Bytes.toBytes(i));
1067        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1068        region.put(put);
1069        region.flush(true);
1070      }
1071
1072      // this will create a region with 3 files from flush
1073      assertEquals(3, region.getStore(family).getStorefilesCount());
1074      List<String> storeFiles = new ArrayList<>(3);
1075      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1076        storeFiles.add(sf.getPath().getName());
1077      }
1078
1079      // now verify that the flush markers are written
1080      wal.shutdown();
1081      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1082        TEST_UTIL.getConfiguration());
1083      try {
1084        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1085        long lastFlushSeqId = -1;
1086        while (true) {
1087          WAL.Entry entry = reader.next();
1088          if (entry == null) {
1089            break;
1090          }
1091          Cell cell = entry.getEdit().getCells().get(0);
1092          if (WALEdit.isMetaEditFamily(cell)) {
1093            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1094            assertNotNull(flushDesc);
1095            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1096            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1097              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1098            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1099              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1100            }
1101            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1102            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1103            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1104            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1105            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1106            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1107            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1108              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1109            } else {
1110              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1111              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1112            }
1113
1114            flushDescriptors.add(entry);
1115          }
1116        }
1117
1118        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1119
1120        // now write those markers to the recovered edits again.
1121
1122        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1123
1124        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1125        fs.create(recoveredEdits);
1126        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1127
1128        for (WAL.Entry entry : flushDescriptors) {
1129          writer.append(entry);
1130        }
1131        writer.close();
1132      } finally {
1133        if (null != reader) {
1134          try {
1135            reader.close();
1136          } catch (IOException exception) {
1137            LOG.warn("Problem closing wal: " + exception.getMessage());
1138            LOG.debug("exception details", exception);
1139          }
1140        }
1141      }
1142
1143      // close the region now, and reopen again
1144      HBaseTestingUtility.closeRegionAndWAL(this.region);
1145      region = HRegion.openHRegion(region, null);
1146
1147      // now check whether we have can read back the data from region
1148      for (long i = minSeqId; i < maxSeqId; i++) {
1149        Get get = new Get(Bytes.toBytes(i));
1150        Result result = region.get(get);
1151        byte[] value = result.getValue(family, Bytes.toBytes(i));
1152        assertArrayEquals(Bytes.toBytes(i), value);
1153      }
1154    } finally {
1155      HBaseTestingUtility.closeRegionAndWAL(this.region);
1156      this.region = null;
1157      wals.close();
1158    }
1159  }
1160
1161  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1162    volatile FlushAction[] actions;
1163    public IsFlushWALMarker(FlushAction... actions) {
1164      this.actions = actions;
1165    }
1166    @Override
1167    public boolean matches(WALEdit edit) {
1168      List<Cell> cells = edit.getCells();
1169      if (cells.isEmpty()) {
1170        return false;
1171      }
1172      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1173        FlushDescriptor desc;
1174        try {
1175          desc = WALEdit.getFlushDescriptor(cells.get(0));
1176        } catch (IOException e) {
1177          LOG.warn(e.toString(), e);
1178          return false;
1179        }
1180        if (desc != null) {
1181          for (FlushAction action : actions) {
1182            if (desc.getAction() == action) {
1183              return true;
1184            }
1185          }
1186        }
1187      }
1188      return false;
1189    }
1190    public IsFlushWALMarker set(FlushAction... actions) {
1191      this.actions = actions;
1192      return this;
1193    }
1194  }
1195
1196  @Test
1197  public void testFlushMarkersWALFail() throws Exception {
1198    // test the cases where the WAL append for flush markers fail.
1199    byte[] family = Bytes.toBytes("family");
1200
1201    // spy an actual WAL implementation to throw exception (was not able to mock)
1202    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1203
1204    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1205    CommonFSUtils.setRootDir(walConf, logDir);
1206    // Make up a WAL that we can manipulate at append time.
1207    class FailAppendFlushMarkerWAL extends FSHLog {
1208      volatile FlushAction [] flushActions = null;
1209
1210      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1211      throws IOException {
1212        super(fs, root, logDir, conf);
1213      }
1214
1215      @Override
1216      protected Writer createWriterInstance(Path path) throws IOException {
1217        final Writer w = super.createWriterInstance(path);
1218        return new Writer() {
1219          @Override
1220          public void close() throws IOException {
1221            w.close();
1222          }
1223
1224          @Override
1225          public void sync(boolean forceSync) throws IOException {
1226            w.sync(forceSync);
1227          }
1228
1229          @Override
1230          public void append(Entry entry) throws IOException {
1231            List<Cell> cells = entry.getEdit().getCells();
1232            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1233              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1234              if (desc != null) {
1235                for (FlushAction flushAction: flushActions) {
1236                  if (desc.getAction().equals(flushAction)) {
1237                    throw new IOException("Failed to append flush marker! " + flushAction);
1238                  }
1239                }
1240              }
1241            }
1242            w.append(entry);
1243          }
1244
1245          @Override
1246          public long getLength() {
1247            return w.getLength();
1248          }
1249
1250          @Override
1251          public long getSyncedLength() {
1252            return w.getSyncedLength();
1253          }
1254        };
1255      }
1256    }
1257    FailAppendFlushMarkerWAL wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf),
1258      CommonFSUtils.getRootDir(walConf), method, walConf);
1259    wal.init();
1260    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1261      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1262    int i = 0;
1263    Put put = new Put(Bytes.toBytes(i));
1264    put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1265    put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1266    region.put(put);
1267
1268    // 1. Test case where START_FLUSH throws exception
1269    wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1270
1271    // start cache flush will throw exception
1272    try {
1273      region.flush(true);
1274      fail("This should have thrown exception");
1275    } catch (DroppedSnapshotException unexpected) {
1276      // this should not be a dropped snapshot exception. Meaning that RS will not abort
1277      throw unexpected;
1278    } catch (IOException expected) {
1279      // expected
1280    }
1281    // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1282    // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1283    region.close(true);
1284    wal.close();
1285
1286    // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1287    wal.flushActions = new FlushAction[] { FlushAction.COMMIT_FLUSH };
1288    wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), CommonFSUtils.getRootDir(walConf),
1289      method, walConf);
1290    wal.init();
1291    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1292      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1293    region.put(put);
1294    // 3. Test case where ABORT_FLUSH will throw exception.
1295    // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1296    // DroppedSnapshotException. Below COMMIT_FLUSH will cause flush to abort
1297    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1298
1299    try {
1300      region.flush(true);
1301      fail("This should have thrown exception");
1302    } catch (DroppedSnapshotException expected) {
1303      // we expect this exception, since we were able to write the snapshot, but failed to
1304      // write the flush marker to WAL
1305    } catch (IOException unexpected) {
1306      throw unexpected;
1307    }
1308  }
1309
1310  @Test
1311  public void testGetWhileRegionClose() throws IOException {
1312    Configuration hc = initSplit();
1313    int numRows = 100;
1314    byte[][] families = { fam1, fam2, fam3 };
1315
1316    // Setting up region
1317    this.region = initHRegion(tableName, method, hc, families);
1318    // Put data in region
1319    final int startRow = 100;
1320    putData(startRow, numRows, qual1, families);
1321    putData(startRow, numRows, qual2, families);
1322    putData(startRow, numRows, qual3, families);
1323    final AtomicBoolean done = new AtomicBoolean(false);
1324    final AtomicInteger gets = new AtomicInteger(0);
1325    GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1326    try {
1327      // Set ten threads running concurrently getting from the region.
1328      for (int i = 0; i < threads.length / 2; i++) {
1329        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1330        threads[i].setDaemon(true);
1331        threads[i].start();
1332      }
1333      // Artificially make the condition by setting closing flag explicitly.
1334      // I can't make the issue happen with a call to region.close().
1335      this.region.closing.set(true);
1336      for (int i = threads.length / 2; i < threads.length; i++) {
1337        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1338        threads[i].setDaemon(true);
1339        threads[i].start();
1340      }
1341    } finally {
1342      if (this.region != null) {
1343        HBaseTestingUtility.closeRegionAndWAL(this.region);
1344        this.region = null;
1345      }
1346    }
1347    done.set(true);
1348    for (GetTillDoneOrException t : threads) {
1349      try {
1350        t.join();
1351      } catch (InterruptedException e) {
1352        e.printStackTrace();
1353      }
1354      if (t.e != null) {
1355        LOG.info("Exception=" + t.e);
1356        assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1357      }
1358    }
1359  }
1360
1361  /*
1362   * Thread that does get on single row until 'done' flag is flipped. If an
1363   * exception causes us to fail, it records it.
1364   */
1365  class GetTillDoneOrException extends Thread {
1366    private final Get g;
1367    private final AtomicBoolean done;
1368    private final AtomicInteger count;
1369    private Exception e;
1370
1371    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1372        final AtomicInteger c) {
1373      super("getter." + i);
1374      this.g = new Get(r);
1375      this.done = d;
1376      this.count = c;
1377    }
1378
1379    @Override
1380    public void run() {
1381      while (!this.done.get()) {
1382        try {
1383          assertTrue(region.get(g).size() > 0);
1384          this.count.incrementAndGet();
1385        } catch (Exception e) {
1386          this.e = e;
1387          break;
1388        }
1389      }
1390    }
1391  }
1392
1393  /*
1394   * An involved filter test. Has multiple column families and deletes in mix.
1395   */
1396  @Test
1397  public void testWeirdCacheBehaviour() throws Exception {
1398    final TableName tableName = TableName.valueOf(name.getMethodName());
1399    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1400        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1401    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1402    String value = "this is the value";
1403    String value2 = "this is some other value";
1404    String keyPrefix1 = "prefix1";
1405    String keyPrefix2 = "prefix2";
1406    String keyPrefix3 = "prefix3";
1407    putRows(this.region, 3, value, keyPrefix1);
1408    putRows(this.region, 3, value, keyPrefix2);
1409    putRows(this.region, 3, value, keyPrefix3);
1410    putRows(this.region, 3, value2, keyPrefix1);
1411    putRows(this.region, 3, value2, keyPrefix2);
1412    putRows(this.region, 3, value2, keyPrefix3);
1413    System.out.println("Checking values for key: " + keyPrefix1);
1414    assertEquals("Got back incorrect number of rows from scan", 3,
1415        getNumberOfRows(keyPrefix1, value2, this.region));
1416    System.out.println("Checking values for key: " + keyPrefix2);
1417    assertEquals("Got back incorrect number of rows from scan", 3,
1418        getNumberOfRows(keyPrefix2, value2, this.region));
1419    System.out.println("Checking values for key: " + keyPrefix3);
1420    assertEquals("Got back incorrect number of rows from scan", 3,
1421        getNumberOfRows(keyPrefix3, value2, this.region));
1422    deleteColumns(this.region, value2, keyPrefix1);
1423    deleteColumns(this.region, value2, keyPrefix2);
1424    deleteColumns(this.region, value2, keyPrefix3);
1425    System.out.println("Starting important checks.....");
1426    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1427        getNumberOfRows(keyPrefix1, value2, this.region));
1428    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1429        getNumberOfRows(keyPrefix2, value2, this.region));
1430    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1431        getNumberOfRows(keyPrefix3, value2, this.region));
1432  }
1433
1434  @Test
1435  public void testAppendWithReadOnlyTable() throws Exception {
1436    final TableName tableName = TableName.valueOf(name.getMethodName());
1437    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1438    boolean exceptionCaught = false;
1439    Append append = new Append(Bytes.toBytes("somerow"));
1440    append.setDurability(Durability.SKIP_WAL);
1441    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1442        Bytes.toBytes("somevalue"));
1443    try {
1444      region.append(append);
1445    } catch (IOException e) {
1446      exceptionCaught = true;
1447    }
1448    assertTrue(exceptionCaught == true);
1449  }
1450
1451  @Test
1452  public void testIncrWithReadOnlyTable() throws Exception {
1453    final TableName tableName = TableName.valueOf(name.getMethodName());
1454    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1455    boolean exceptionCaught = false;
1456    Increment inc = new Increment(Bytes.toBytes("somerow"));
1457    inc.setDurability(Durability.SKIP_WAL);
1458    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1459    try {
1460      region.increment(inc);
1461    } catch (IOException e) {
1462      exceptionCaught = true;
1463    }
1464    assertTrue(exceptionCaught == true);
1465  }
1466
1467  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1468    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1469    int count = 0;
1470    boolean more = false;
1471    List<Cell> results = new ArrayList<>();
1472    do {
1473      more = scanner.next(results);
1474      if (results != null && !results.isEmpty())
1475        count++;
1476      else
1477        break;
1478      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1479      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1480      r.delete(delete);
1481      results.clear();
1482    } while (more);
1483    assertEquals("Did not perform correct number of deletes", 3, count);
1484  }
1485
1486  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1487    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1488    int numberOfResults = 0;
1489    List<Cell> results = new ArrayList<>();
1490    boolean more = false;
1491    do {
1492      more = resultScanner.next(results);
1493      if (results != null && !results.isEmpty())
1494        numberOfResults++;
1495      else
1496        break;
1497      for (Cell kv : results) {
1498        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1499      }
1500      results.clear();
1501    } while (more);
1502    return numberOfResults;
1503  }
1504
1505  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1506      throws IOException {
1507    // Defaults FilterList.Operator.MUST_PASS_ALL.
1508    FilterList allFilters = new FilterList();
1509    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1510    // Only return rows where this column value exists in the row.
1511    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1512        Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes.toBytes(value));
1513    filter.setFilterIfMissing(true);
1514    allFilters.addFilter(filter);
1515    Scan scan = new Scan();
1516    scan.addFamily(Bytes.toBytes("trans-blob"));
1517    scan.addFamily(Bytes.toBytes("trans-type"));
1518    scan.addFamily(Bytes.toBytes("trans-date"));
1519    scan.addFamily(Bytes.toBytes("trans-tags"));
1520    scan.addFamily(Bytes.toBytes("trans-group"));
1521    scan.setFilter(allFilters);
1522    return r.getScanner(scan);
1523  }
1524
1525  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1526    for (int i = 0; i < numRows; i++) {
1527      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1528      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1529      Put put = new Put(Bytes.toBytes(row));
1530      put.setDurability(Durability.SKIP_WAL);
1531      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1532      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1533      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1534      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1535      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1536      r.put(put);
1537    }
1538  }
1539
1540  @Test
1541  public void testFamilyWithAndWithoutColon() throws Exception {
1542    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1543    this.region = initHRegion(tableName, method, CONF, cf);
1544    Put p = new Put(tableName.toBytes());
1545    byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1546    p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1547    boolean exception = false;
1548    try {
1549      this.region.put(p);
1550    } catch (NoSuchColumnFamilyException e) {
1551      exception = true;
1552    }
1553    assertTrue(exception);
1554  }
1555
1556  @Test
1557  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1558    final Put[] puts = new Put[10];
1559    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1560    long syncs = prepareRegionForBachPut(puts, source, false);
1561
1562    OperationStatus[] codes = this.region.batchMutate(puts);
1563    assertEquals(10, codes.length);
1564    for (int i = 0; i < 10; i++) {
1565      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1566    }
1567    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1568
1569    LOG.info("Next a batch put with one invalid family");
1570    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1571    codes = this.region.batchMutate(puts);
1572    assertEquals(10, codes.length);
1573    for (int i = 0; i < 10; i++) {
1574      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1575          codes[i].getOperationStatusCode());
1576    }
1577
1578    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1579  }
1580
1581  @Test
1582  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1583    final Put[] puts = new Put[10];
1584    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1585    long syncs = prepareRegionForBachPut(puts, source, false);
1586
1587    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1588
1589    LOG.info("batchPut will have to break into four batches to avoid row locks");
1590    RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1591    RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1592    RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1593    RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1594
1595    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1596    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1597    final CountDownLatch startingPuts = new CountDownLatch(1);
1598    final CountDownLatch startingClose = new CountDownLatch(1);
1599    TestThread putter = new TestThread(ctx) {
1600      @Override
1601      public void doWork() throws IOException {
1602        startingPuts.countDown();
1603        retFromThread.set(region.batchMutate(puts));
1604      }
1605    };
1606    LOG.info("...starting put thread while holding locks");
1607    ctx.addThread(putter);
1608    ctx.startThreads();
1609
1610    // Now attempt to close the region from another thread.  Prior to HBASE-12565
1611    // this would cause the in-progress batchMutate operation to to fail with
1612    // exception because it use to release and re-acquire the close-guard lock
1613    // between batches.  Caller then didn't get status indicating which writes succeeded.
1614    // We now expect this thread to block until the batchMutate call finishes.
1615    Thread regionCloseThread = new TestThread(ctx) {
1616      @Override
1617      public void doWork() {
1618        try {
1619          startingPuts.await();
1620          // Give some time for the batch mutate to get in.
1621          // We don't want to race with the mutate
1622          Thread.sleep(10);
1623          startingClose.countDown();
1624          HBaseTestingUtility.closeRegionAndWAL(region);
1625          region = null;
1626        } catch (IOException e) {
1627          throw new RuntimeException(e);
1628        } catch (InterruptedException e) {
1629          throw new RuntimeException(e);
1630        }
1631      }
1632    };
1633    regionCloseThread.start();
1634
1635    startingClose.await();
1636    startingPuts.await();
1637    Thread.sleep(100);
1638    LOG.info("...releasing row lock 1, which should let put thread continue");
1639    rowLock1.release();
1640    rowLock2.release();
1641    rowLock3.release();
1642    waitForCounter(source, "syncTimeNumOps", syncs + 1);
1643
1644    LOG.info("...joining on put thread");
1645    ctx.stop();
1646    regionCloseThread.join();
1647
1648    OperationStatus[] codes = retFromThread.get();
1649    for (int i = 0; i < codes.length; i++) {
1650      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1651          codes[i].getOperationStatusCode());
1652    }
1653    rowLock4.release();
1654  }
1655
1656  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1657      throws InterruptedException {
1658    long startWait = System.currentTimeMillis();
1659    long currentCount;
1660    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1661      Thread.sleep(100);
1662      if (System.currentTimeMillis() - startWait > 10000) {
1663        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1664            expectedCount, currentCount));
1665      }
1666    }
1667  }
1668
1669  @Test
1670  public void testAtomicBatchPut() throws IOException {
1671    final Put[] puts = new Put[10];
1672    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1673    long syncs = prepareRegionForBachPut(puts, source, false);
1674
1675    // 1. Straight forward case, should succeed
1676    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1677        HConstants.NO_NONCE, HConstants.NO_NONCE);
1678    OperationStatus[] codes = this.region.batchMutate(batchOp);
1679    assertEquals(10, codes.length);
1680    for (int i = 0; i < 10; i++) {
1681      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1682    }
1683    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1684
1685    // 2. Failed to get lock
1686    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1687    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1688    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1689    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1690    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1691    final CountDownLatch finishedPuts = new CountDownLatch(1);
1692    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1693        HConstants
1694        .NO_NONCE,
1695        HConstants.NO_NONCE);
1696    TestThread putter = new TestThread(ctx) {
1697      @Override
1698      public void doWork() throws IOException {
1699        try {
1700          region.batchMutate(finalBatchOp);
1701        } catch (IOException ioe) {
1702          LOG.error("test failed!", ioe);
1703          retFromThread.set(ioe);
1704        }
1705        finishedPuts.countDown();
1706      }
1707    };
1708    LOG.info("...starting put thread while holding locks");
1709    ctx.addThread(putter);
1710    ctx.startThreads();
1711    LOG.info("...waiting for batch puts while holding locks");
1712    try {
1713      finishedPuts.await();
1714    } catch (InterruptedException e) {
1715      LOG.error("Interrupted!", e);
1716    } finally {
1717      if (lock != null) {
1718        lock.release();
1719      }
1720    }
1721    assertNotNull(retFromThread.get());
1722    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1723
1724    // 3. Exception thrown in validation
1725    LOG.info("Next a batch put with one invalid family");
1726    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1727    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1728        HConstants.NO_NONCE);
1729    thrown.expect(NoSuchColumnFamilyException.class);
1730    this.region.batchMutate(batchOp);
1731  }
1732
1733  @Test
1734  public void testBatchPutWithTsSlop() throws Exception {
1735    // add data with a timestamp that is too recent for range. Ensure assert
1736    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1737    final Put[] puts = new Put[10];
1738    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1739
1740    long syncs = prepareRegionForBachPut(puts, source, true);
1741
1742    OperationStatus[] codes = this.region.batchMutate(puts);
1743    assertEquals(10, codes.length);
1744    for (int i = 0; i < 10; i++) {
1745      assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1746    }
1747    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1748  }
1749
1750  /**
1751   * @return syncs initial syncTimeNumOps
1752   */
1753  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1754      boolean slop) throws IOException {
1755    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1756
1757    LOG.info("First a batch put with all valid puts");
1758    for (int i = 0; i < puts.length; i++) {
1759      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1760          new Put(Bytes.toBytes("row_" + i));
1761      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1762    }
1763
1764    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1765    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1766    return syncs;
1767  }
1768
1769  // ////////////////////////////////////////////////////////////////////////////
1770  // checkAndMutate tests
1771  // ////////////////////////////////////////////////////////////////////////////
1772  @Test
1773  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1774    byte[] row1 = Bytes.toBytes("row1");
1775    byte[] fam1 = Bytes.toBytes("fam1");
1776    byte[] qf1 = Bytes.toBytes("qualifier");
1777    byte[] emptyVal = new byte[] {};
1778    byte[] val1 = Bytes.toBytes("value1");
1779    byte[] val2 = Bytes.toBytes("value2");
1780
1781    // Setting up region
1782    this.region = initHRegion(tableName, method, CONF, fam1);
1783    // Putting empty data in key
1784    Put put = new Put(row1);
1785    put.addColumn(fam1, qf1, emptyVal);
1786
1787    // checkAndPut with empty value
1788    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1789        new BinaryComparator(emptyVal), put);
1790    assertTrue(res);
1791
1792    // Putting data in key
1793    put = new Put(row1);
1794    put.addColumn(fam1, qf1, val1);
1795
1796    // checkAndPut with correct value
1797    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1798        new BinaryComparator(emptyVal), put);
1799    assertTrue(res);
1800
1801    // not empty anymore
1802    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1803        new BinaryComparator(emptyVal), put);
1804    assertFalse(res);
1805
1806    Delete delete = new Delete(row1);
1807    delete.addColumn(fam1, qf1);
1808    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1809        new BinaryComparator(emptyVal), delete);
1810    assertFalse(res);
1811
1812    put = new Put(row1);
1813    put.addColumn(fam1, qf1, val2);
1814    // checkAndPut with correct value
1815    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1816        new BinaryComparator(val1), put);
1817    assertTrue(res);
1818
1819    // checkAndDelete with correct value
1820    delete = new Delete(row1);
1821    delete.addColumn(fam1, qf1);
1822    delete.addColumn(fam1, qf1);
1823    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1824        new BinaryComparator(val2), delete);
1825    assertTrue(res);
1826
1827    delete = new Delete(row1);
1828    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1829        new BinaryComparator(emptyVal), delete);
1830    assertTrue(res);
1831
1832    // checkAndPut looking for a null value
1833    put = new Put(row1);
1834    put.addColumn(fam1, qf1, val1);
1835
1836    res = region
1837        .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1838    assertTrue(res);
1839  }
1840
1841  @Test
1842  public void testCheckAndMutate_WithWrongValue() throws IOException {
1843    byte[] row1 = Bytes.toBytes("row1");
1844    byte[] fam1 = Bytes.toBytes("fam1");
1845    byte[] qf1 = Bytes.toBytes("qualifier");
1846    byte[] val1 = Bytes.toBytes("value1");
1847    byte[] val2 = Bytes.toBytes("value2");
1848    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1849    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1850
1851    // Setting up region
1852    this.region = initHRegion(tableName, method, CONF, fam1);
1853    // Putting data in key
1854    Put put = new Put(row1);
1855    put.addColumn(fam1, qf1, val1);
1856    region.put(put);
1857
1858    // checkAndPut with wrong value
1859    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1860        new BinaryComparator(val2), put);
1861    assertEquals(false, res);
1862
1863    // checkAndDelete with wrong value
1864    Delete delete = new Delete(row1);
1865    delete.addFamily(fam1);
1866    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1867        new BinaryComparator(val2), put);
1868    assertEquals(false, res);
1869
1870    // Putting data in key
1871    put = new Put(row1);
1872    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1873    region.put(put);
1874
1875    // checkAndPut with wrong value
1876    res =
1877        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1878            new BigDecimalComparator(bd2), put);
1879    assertEquals(false, res);
1880
1881    // checkAndDelete with wrong value
1882    delete = new Delete(row1);
1883    delete.addFamily(fam1);
1884    res =
1885        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1886            new BigDecimalComparator(bd2), put);
1887    assertEquals(false, res);
1888  }
1889
1890  @Test
1891  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1892    byte[] row1 = Bytes.toBytes("row1");
1893    byte[] fam1 = Bytes.toBytes("fam1");
1894    byte[] qf1 = Bytes.toBytes("qualifier");
1895    byte[] val1 = Bytes.toBytes("value1");
1896    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1897
1898    // Setting up region
1899    this.region = initHRegion(tableName, method, CONF, fam1);
1900    // Putting data in key
1901    long now = System.currentTimeMillis();
1902    Put put = new Put(row1);
1903    put.addColumn(fam1, qf1, now, val1);
1904    region.put(put);
1905
1906    // checkAndPut with correct value
1907    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1908        new BinaryComparator(val1), put);
1909    assertEquals("First", true, res);
1910
1911    // checkAndDelete with correct value
1912    Delete delete = new Delete(row1, now + 1);
1913    delete.addColumn(fam1, qf1);
1914    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1915        delete);
1916    assertEquals("Delete", true, res);
1917
1918    // Putting data in key
1919    put = new Put(row1);
1920    put.addColumn(fam1, qf1, now + 2, Bytes.toBytes(bd1));
1921    region.put(put);
1922
1923    // checkAndPut with correct value
1924    res =
1925        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1926            bd1), put);
1927    assertEquals("Second put", true, res);
1928
1929    // checkAndDelete with correct value
1930    delete = new Delete(row1, now + 3);
1931    delete.addColumn(fam1, qf1);
1932    res =
1933        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1934            bd1), delete);
1935    assertEquals("Second delete", true, res);
1936  }
1937
1938  @Test
1939  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1940    byte[] row1 = Bytes.toBytes("row1");
1941    byte[] fam1 = Bytes.toBytes("fam1");
1942    byte[] qf1 = Bytes.toBytes("qualifier");
1943    byte[] val1 = Bytes.toBytes("value1");
1944    byte[] val2 = Bytes.toBytes("value2");
1945    byte[] val3 = Bytes.toBytes("value3");
1946    byte[] val4 = Bytes.toBytes("value4");
1947
1948    // Setting up region
1949    this.region = initHRegion(tableName, method, CONF, fam1);
1950    // Putting val3 in key
1951    Put put = new Put(row1);
1952    put.addColumn(fam1, qf1, val3);
1953    region.put(put);
1954
1955    // Test CompareOp.LESS: original = val3, compare with val3, fail
1956    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1957        new BinaryComparator(val3), put);
1958    assertEquals(false, res);
1959
1960    // Test CompareOp.LESS: original = val3, compare with val4, fail
1961    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1962        new BinaryComparator(val4), put);
1963    assertEquals(false, res);
1964
1965    // Test CompareOp.LESS: original = val3, compare with val2,
1966    // succeed (now value = val2)
1967    put = new Put(row1);
1968    put.addColumn(fam1, qf1, val2);
1969    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1970        new BinaryComparator(val2), put);
1971    assertEquals(true, res);
1972
1973    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1974    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1975        new BinaryComparator(val3), put);
1976    assertEquals(false, res);
1977
1978    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1979    // succeed (value still = val2)
1980    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1981        new BinaryComparator(val2), put);
1982    assertEquals(true, res);
1983
1984    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1985    // succeed (now value = val3)
1986    put = new Put(row1);
1987    put.addColumn(fam1, qf1, val3);
1988    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1989        new BinaryComparator(val1), put);
1990    assertEquals(true, res);
1991
1992    // Test CompareOp.GREATER: original = val3, compare with val3, fail
1993    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1994        new BinaryComparator(val3), put);
1995    assertEquals(false, res);
1996
1997    // Test CompareOp.GREATER: original = val3, compare with val2, fail
1998    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1999        new BinaryComparator(val2), put);
2000    assertEquals(false, res);
2001
2002    // Test CompareOp.GREATER: original = val3, compare with val4,
2003    // succeed (now value = val2)
2004    put = new Put(row1);
2005    put.addColumn(fam1, qf1, val2);
2006    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
2007        new BinaryComparator(val4), put);
2008    assertEquals(true, res);
2009
2010    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
2011    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2012        new BinaryComparator(val1), put);
2013    assertEquals(false, res);
2014
2015    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
2016    // succeed (value still = val2)
2017    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2018        new BinaryComparator(val2), put);
2019    assertEquals(true, res);
2020
2021    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
2022    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
2023        new BinaryComparator(val3), put);
2024    assertEquals(true, res);
2025  }
2026
2027  @Test
2028  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
2029    byte[] row1 = Bytes.toBytes("row1");
2030    byte[] fam1 = Bytes.toBytes("fam1");
2031    byte[] fam2 = Bytes.toBytes("fam2");
2032    byte[] qf1 = Bytes.toBytes("qualifier");
2033    byte[] val1 = Bytes.toBytes("value1");
2034    byte[] val2 = Bytes.toBytes("value2");
2035
2036    byte[][] families = { fam1, fam2 };
2037
2038    // Setting up region
2039    this.region = initHRegion(tableName, method, CONF, families);
2040    // Putting data in the key to check
2041    Put put = new Put(row1);
2042    put.addColumn(fam1, qf1, val1);
2043    region.put(put);
2044
2045    // Creating put to add
2046    long ts = System.currentTimeMillis();
2047    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
2048    put = new Put(row1);
2049    put.add(kv);
2050
2051    // checkAndPut with wrong value
2052    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2053        new BinaryComparator(val1), put);
2054    assertEquals(true, res);
2055
2056    Get get = new Get(row1);
2057    get.addColumn(fam2, qf1);
2058    Cell[] actual = region.get(get).rawCells();
2059
2060    Cell[] expected = { kv };
2061
2062    assertEquals(expected.length, actual.length);
2063    for (int i = 0; i < actual.length; i++) {
2064      assertEquals(expected[i], actual[i]);
2065    }
2066  }
2067
2068  @Test
2069  public void testCheckAndPut_wrongRowInPut() throws IOException {
2070    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2071    Put put = new Put(row2);
2072    put.addColumn(fam1, qual1, value1);
2073    try {
2074      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2075          new BinaryComparator(value2), put);
2076      fail();
2077    } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2078      // expected exception.
2079    }
2080  }
2081
2082  @Test
2083  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2084    byte[] row1 = Bytes.toBytes("row1");
2085    byte[] fam1 = Bytes.toBytes("fam1");
2086    byte[] fam2 = Bytes.toBytes("fam2");
2087    byte[] qf1 = Bytes.toBytes("qualifier1");
2088    byte[] qf2 = Bytes.toBytes("qualifier2");
2089    byte[] qf3 = Bytes.toBytes("qualifier3");
2090    byte[] val1 = Bytes.toBytes("value1");
2091    byte[] val2 = Bytes.toBytes("value2");
2092    byte[] val3 = Bytes.toBytes("value3");
2093    byte[] emptyVal = new byte[] {};
2094
2095    byte[][] families = { fam1, fam2 };
2096
2097    // Setting up region
2098    this.region = initHRegion(tableName, method, CONF, families);
2099    // Put content
2100    Put put = new Put(row1);
2101    put.addColumn(fam1, qf1, val1);
2102    region.put(put);
2103    Threads.sleep(2);
2104
2105    put = new Put(row1);
2106    put.addColumn(fam1, qf1, val2);
2107    put.addColumn(fam2, qf1, val3);
2108    put.addColumn(fam2, qf2, val2);
2109    put.addColumn(fam2, qf3, val1);
2110    put.addColumn(fam1, qf3, val1);
2111    region.put(put);
2112
2113    LOG.info("get={}", region.get(new Get(row1).addColumn(fam1, qf1)).toString());
2114
2115    // Multi-column delete
2116    Delete delete = new Delete(row1);
2117    delete.addColumn(fam1, qf1);
2118    delete.addColumn(fam2, qf1);
2119    delete.addColumn(fam1, qf3);
2120    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2121        new BinaryComparator(val2), delete);
2122    assertEquals(true, res);
2123
2124    Get get = new Get(row1);
2125    get.addColumn(fam1, qf1);
2126    get.addColumn(fam1, qf3);
2127    get.addColumn(fam2, qf2);
2128    Result r = region.get(get);
2129    assertEquals(2, r.size());
2130    assertArrayEquals(val1, r.getValue(fam1, qf1));
2131    assertArrayEquals(val2, r.getValue(fam2, qf2));
2132
2133    // Family delete
2134    delete = new Delete(row1);
2135    delete.addFamily(fam2);
2136    res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL,
2137        new BinaryComparator(emptyVal), delete);
2138    assertEquals(true, res);
2139
2140    get = new Get(row1);
2141    r = region.get(get);
2142    assertEquals(1, r.size());
2143    assertArrayEquals(val1, r.getValue(fam1, qf1));
2144
2145    // Row delete
2146    delete = new Delete(row1);
2147    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2148        delete);
2149    assertEquals(true, res);
2150    get = new Get(row1);
2151    r = region.get(get);
2152    assertEquals(0, r.size());
2153  }
2154
2155  @Test
2156  public void testCheckAndMutate_WithFilters() throws Throwable {
2157    final byte[] FAMILY = Bytes.toBytes("fam");
2158
2159    // Setting up region
2160    this.region = initHRegion(tableName, method, CONF, FAMILY);
2161
2162    // Put one row
2163    Put put = new Put(row);
2164    put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
2165    put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
2166    put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
2167    region.put(put);
2168
2169    // Put with success
2170    boolean ok = region.checkAndMutate(row,
2171      new FilterList(
2172        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2173          Bytes.toBytes("a")),
2174        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2175          Bytes.toBytes("b"))
2176      ),
2177      new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
2178    assertTrue(ok);
2179
2180    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
2181    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
2182
2183    // Put with failure
2184    ok = region.checkAndMutate(row,
2185      new FilterList(
2186        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2187          Bytes.toBytes("a")),
2188        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2189          Bytes.toBytes("c"))
2190      ),
2191      new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")));
2192    assertFalse(ok);
2193
2194    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).isEmpty());
2195
2196    // Delete with success
2197    ok = region.checkAndMutate(row,
2198      new FilterList(
2199        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2200          Bytes.toBytes("a")),
2201        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2202          Bytes.toBytes("b"))
2203      ),
2204      new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")));
2205    assertTrue(ok);
2206
2207    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).isEmpty());
2208
2209    // Mutate with success
2210    ok = region.checkAndRowMutate(row,
2211      new FilterList(
2212        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2213          Bytes.toBytes("a")),
2214        new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
2215          Bytes.toBytes("b"))
2216      ),
2217      new RowMutations(row)
2218        .add((Mutation) new Put(row)
2219          .addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
2220        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
2221    assertTrue(ok);
2222
2223    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("E")));
2224    assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
2225
2226    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
2227  }
2228
2229  @Test
2230  public void testCheckAndMutate_WithFiltersAndTimeRange() throws Throwable {
2231    final byte[] FAMILY = Bytes.toBytes("fam");
2232
2233    // Setting up region
2234    this.region = initHRegion(tableName, method, CONF, FAMILY);
2235
2236    // Put with specifying the timestamp
2237    region.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")));
2238
2239    // Put with success
2240    boolean ok = region.checkAndMutate(row,
2241      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2242        Bytes.toBytes("a")),
2243      TimeRange.between(0, 101),
2244      new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
2245    assertTrue(ok);
2246
2247    Result result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B")));
2248    assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
2249
2250    // Put with failure
2251    ok = region.checkAndMutate(row,
2252      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2253        Bytes.toBytes("a")),
2254      TimeRange.between(0, 100),
2255      new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
2256    assertFalse(ok);
2257
2258    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).isEmpty());
2259
2260    // Mutate with success
2261    ok = region.checkAndRowMutate(row,
2262      new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
2263        Bytes.toBytes("a")),
2264      TimeRange.between(0, 101),
2265      new RowMutations(row)
2266        .add((Mutation) new Put(row)
2267          .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
2268        .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
2269    assertTrue(ok);
2270
2271    result = region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D")));
2272    assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
2273
2274    assertTrue(region.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).isEmpty());
2275  }
2276
2277  @Test
2278  public void testCheckAndMutate_wrongMutationType() throws Throwable {
2279    // Setting up region
2280    this.region = initHRegion(tableName, method, CONF, fam1);
2281
2282    try {
2283      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(value1),
2284        new Increment(row).addColumn(fam1, qual1, 1));
2285      fail("should throw DoNotRetryIOException");
2286    } catch (DoNotRetryIOException e) {
2287      assertEquals("Action must be Put or Delete", e.getMessage());
2288    }
2289
2290    try {
2291      region.checkAndMutate(row,
2292        new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1),
2293        new Increment(row).addColumn(fam1, qual1, 1));
2294      fail("should throw DoNotRetryIOException");
2295    } catch (DoNotRetryIOException e) {
2296      assertEquals("Action must be Put or Delete", e.getMessage());
2297    }
2298  }
2299
2300  @Test
2301  public void testCheckAndMutate_wrongRow() throws Throwable {
2302    final byte[] wrongRow = Bytes.toBytes("wrongRow");
2303
2304    // Setting up region
2305    this.region = initHRegion(tableName, method, CONF, fam1);
2306
2307    try {
2308      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(value1),
2309        new Put(wrongRow).addColumn(fam1, qual1, value1));
2310      fail("should throw DoNotRetryIOException");
2311    } catch (DoNotRetryIOException e) {
2312      assertEquals("Action's getRow must match", e.getMessage());
2313    }
2314
2315    try {
2316      region.checkAndMutate(row,
2317        new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1),
2318        new Put(wrongRow).addColumn(fam1, qual1, value1));
2319      fail("should throw DoNotRetryIOException");
2320    } catch (DoNotRetryIOException e) {
2321      assertEquals("Action's getRow must match", e.getMessage());
2322    }
2323
2324    try {
2325      region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
2326        new BinaryComparator(value1),
2327        new RowMutations(wrongRow)
2328          .add((Mutation) new Put(wrongRow)
2329            .addColumn(fam1, qual1, value1))
2330          .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
2331      fail("should throw DoNotRetryIOException");
2332    } catch (DoNotRetryIOException e) {
2333      assertEquals("Action's getRow must match", e.getMessage());
2334    }
2335
2336    try {
2337      region.checkAndRowMutate(row,
2338        new SingleColumnValueFilter(fam1, qual1, CompareOperator.EQUAL, value1),
2339        new RowMutations(wrongRow)
2340          .add((Mutation) new Put(wrongRow)
2341            .addColumn(fam1, qual1, value1))
2342          .add((Mutation) new Delete(wrongRow).addColumns(fam1, qual2)));
2343      fail("should throw DoNotRetryIOException");
2344    } catch (DoNotRetryIOException e) {
2345      assertEquals("Action's getRow must match", e.getMessage());
2346    }
2347  }
2348
2349  // ////////////////////////////////////////////////////////////////////////////
2350  // Delete tests
2351  // ////////////////////////////////////////////////////////////////////////////
2352  @Test
2353  public void testDelete_multiDeleteColumn() throws IOException {
2354    byte[] row1 = Bytes.toBytes("row1");
2355    byte[] fam1 = Bytes.toBytes("fam1");
2356    byte[] qual = Bytes.toBytes("qualifier");
2357    byte[] value = Bytes.toBytes("value");
2358
2359    Put put = new Put(row1);
2360    put.addColumn(fam1, qual, 1, value);
2361    put.addColumn(fam1, qual, 2, value);
2362
2363    this.region = initHRegion(tableName, method, CONF, fam1);
2364    region.put(put);
2365
2366    // We do support deleting more than 1 'latest' version
2367    Delete delete = new Delete(row1);
2368    delete.addColumn(fam1, qual);
2369    delete.addColumn(fam1, qual);
2370    region.delete(delete);
2371
2372    Get get = new Get(row1);
2373    get.addFamily(fam1);
2374    Result r = region.get(get);
2375    assertEquals(0, r.size());
2376  }
2377
2378  @Test
2379  public void testDelete_CheckFamily() throws IOException {
2380    byte[] row1 = Bytes.toBytes("row1");
2381    byte[] fam1 = Bytes.toBytes("fam1");
2382    byte[] fam2 = Bytes.toBytes("fam2");
2383    byte[] fam3 = Bytes.toBytes("fam3");
2384    byte[] fam4 = Bytes.toBytes("fam4");
2385
2386    // Setting up region
2387    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2388    List<Cell> kvs = new ArrayList<>();
2389    kvs.add(new KeyValue(row1, fam4, null, null));
2390
2391    // testing existing family
2392    byte[] family = fam2;
2393    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2394    deleteMap.put(family, kvs);
2395    region.delete(deleteMap, Durability.SYNC_WAL);
2396
2397    // testing non existing family
2398    boolean ok = false;
2399    family = fam4;
2400    try {
2401      deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2402      deleteMap.put(family, kvs);
2403      region.delete(deleteMap, Durability.SYNC_WAL);
2404    } catch (Exception e) {
2405      ok = true;
2406    }
2407    assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2408  }
2409
2410  @Test
2411  public void testDelete_mixed() throws IOException, InterruptedException {
2412    byte[] fam = Bytes.toBytes("info");
2413    byte[][] families = { fam };
2414    this.region = initHRegion(tableName, method, CONF, families);
2415    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2416
2417    byte[] row = Bytes.toBytes("table_name");
2418    // column names
2419    byte[] serverinfo = Bytes.toBytes("serverinfo");
2420    byte[] splitA = Bytes.toBytes("splitA");
2421    byte[] splitB = Bytes.toBytes("splitB");
2422
2423    // add some data:
2424    Put put = new Put(row);
2425    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2426    region.put(put);
2427
2428    put = new Put(row);
2429    put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2430    region.put(put);
2431
2432    put = new Put(row);
2433    put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2434    region.put(put);
2435
2436    // ok now delete a split:
2437    Delete delete = new Delete(row);
2438    delete.addColumns(fam, splitA);
2439    region.delete(delete);
2440
2441    // assert some things:
2442    Get get = new Get(row).addColumn(fam, serverinfo);
2443    Result result = region.get(get);
2444    assertEquals(1, result.size());
2445
2446    get = new Get(row).addColumn(fam, splitA);
2447    result = region.get(get);
2448    assertEquals(0, result.size());
2449
2450    get = new Get(row).addColumn(fam, splitB);
2451    result = region.get(get);
2452    assertEquals(1, result.size());
2453
2454    // Assert that after a delete, I can put.
2455    put = new Put(row);
2456    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2457    region.put(put);
2458    get = new Get(row);
2459    result = region.get(get);
2460    assertEquals(3, result.size());
2461
2462    // Now delete all... then test I can add stuff back
2463    delete = new Delete(row);
2464    region.delete(delete);
2465    assertEquals(0, region.get(get).size());
2466
2467    region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2468    result = region.get(get);
2469    assertEquals(1, result.size());
2470  }
2471
2472  @Test
2473  public void testDeleteRowWithFutureTs() throws IOException {
2474    byte[] fam = Bytes.toBytes("info");
2475    byte[][] families = { fam };
2476    this.region = initHRegion(tableName, method, CONF, families);
2477    byte[] row = Bytes.toBytes("table_name");
2478    // column names
2479    byte[] serverinfo = Bytes.toBytes("serverinfo");
2480
2481    // add data in the far future
2482    Put put = new Put(row);
2483    put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2484    region.put(put);
2485
2486    // now delete something in the present
2487    Delete delete = new Delete(row);
2488    region.delete(delete);
2489
2490    // make sure we still see our data
2491    Get get = new Get(row).addColumn(fam, serverinfo);
2492    Result result = region.get(get);
2493    assertEquals(1, result.size());
2494
2495    // delete the future row
2496    delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2497    region.delete(delete);
2498
2499    // make sure it is gone
2500    get = new Get(row).addColumn(fam, serverinfo);
2501    result = region.get(get);
2502    assertEquals(0, result.size());
2503  }
2504
2505  /**
2506   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2507   * the actual timestamp
2508   */
2509  @Test
2510  public void testPutWithLatestTS() throws IOException {
2511    byte[] fam = Bytes.toBytes("info");
2512    byte[][] families = { fam };
2513    this.region = initHRegion(tableName, method, CONF, families);
2514    byte[] row = Bytes.toBytes("row1");
2515    // column names
2516    byte[] qual = Bytes.toBytes("qual");
2517
2518    // add data with LATEST_TIMESTAMP, put without WAL
2519    Put put = new Put(row);
2520    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2521    region.put(put);
2522
2523    // Make sure it shows up with an actual timestamp
2524    Get get = new Get(row).addColumn(fam, qual);
2525    Result result = region.get(get);
2526    assertEquals(1, result.size());
2527    Cell kv = result.rawCells()[0];
2528    LOG.info("Got: " + kv);
2529    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2530        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2531
2532    // Check same with WAL enabled (historically these took different
2533    // code paths, so check both)
2534    row = Bytes.toBytes("row2");
2535    put = new Put(row);
2536    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2537    region.put(put);
2538
2539    // Make sure it shows up with an actual timestamp
2540    get = new Get(row).addColumn(fam, qual);
2541    result = region.get(get);
2542    assertEquals(1, result.size());
2543    kv = result.rawCells()[0];
2544    LOG.info("Got: " + kv);
2545    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2546        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2547  }
2548
2549  /**
2550   * Tests that there is server-side filtering for invalid timestamp upper
2551   * bound. Note that the timestamp lower bound is automatically handled for us
2552   * by the TTL field.
2553   */
2554  @Test
2555  public void testPutWithTsSlop() throws IOException {
2556    byte[] fam = Bytes.toBytes("info");
2557    byte[][] families = { fam };
2558
2559    // add data with a timestamp that is too recent for range. Ensure assert
2560    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2561    this.region = initHRegion(tableName, method, CONF, families);
2562    boolean caughtExcep = false;
2563    try {
2564      // no TS specified == use latest. should not error
2565      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2566      // TS out of range. should error
2567      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2568          System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2569      fail("Expected IOE for TS out of configured timerange");
2570    } catch (FailedSanityCheckException ioe) {
2571      LOG.debug("Received expected exception", ioe);
2572      caughtExcep = true;
2573    }
2574    assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2575  }
2576
2577  @Test
2578  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2579    byte[] fam1 = Bytes.toBytes("columnA");
2580    byte[] fam2 = Bytes.toBytes("columnB");
2581    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2582    byte[] rowA = Bytes.toBytes("rowA");
2583    byte[] rowB = Bytes.toBytes("rowB");
2584
2585    byte[] value = Bytes.toBytes("value");
2586
2587    Delete delete = new Delete(rowA);
2588    delete.addFamily(fam1);
2589
2590    region.delete(delete);
2591
2592    // now create data.
2593    Put put = new Put(rowA);
2594    put.addColumn(fam2, null, value);
2595    region.put(put);
2596
2597    put = new Put(rowB);
2598    put.addColumn(fam1, null, value);
2599    put.addColumn(fam2, null, value);
2600    region.put(put);
2601
2602    Scan scan = new Scan();
2603    scan.addFamily(fam1).addFamily(fam2);
2604    InternalScanner s = region.getScanner(scan);
2605    List<Cell> results = new ArrayList<>();
2606    s.next(results);
2607    assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2608
2609    results.clear();
2610    s.next(results);
2611    assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2612  }
2613
2614  @Test
2615  public void testDataInMemoryWithoutWAL() throws IOException {
2616    FileSystem fs = FileSystem.get(CONF);
2617    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2618    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2619    hLog.init();
2620    // This chunk creation is done throughout the code base. Do we want to move it into core?
2621    // It is missing from this test. W/o it we NPE.
2622    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2623        COLUMN_FAMILY_BYTES);
2624
2625    Cell originalCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
2626      .setRow(row)
2627      .setFamily(COLUMN_FAMILY_BYTES)
2628      .setQualifier(qual1)
2629      .setTimestamp(System.currentTimeMillis())
2630      .setType(KeyValue.Type.Put.getCode())
2631      .setValue(value1)
2632      .build();
2633    final long originalSize = originalCell.getSerializedSize();
2634
2635    Cell addCell = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY)
2636      .setRow(row)
2637      .setFamily(COLUMN_FAMILY_BYTES)
2638      .setQualifier(qual1)
2639      .setTimestamp(System.currentTimeMillis())
2640      .setType(KeyValue.Type.Put.getCode())
2641      .setValue(Bytes.toBytes("xxxxxxxxxx"))
2642      .build();
2643    final long addSize = addCell.getSerializedSize();
2644
2645    LOG.info("originalSize:" + originalSize
2646      + ", addSize:" + addSize);
2647    // start test. We expect that the addPut's durability will be replaced
2648    // by originalPut's durability.
2649
2650    // case 1:
2651    testDataInMemoryWithoutWAL(region,
2652            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2653            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2654            originalSize + addSize);
2655
2656    // case 2:
2657    testDataInMemoryWithoutWAL(region,
2658            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2659            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2660            originalSize + addSize);
2661
2662    // case 3:
2663    testDataInMemoryWithoutWAL(region,
2664            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2665            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2666            0);
2667
2668    // case 4:
2669    testDataInMemoryWithoutWAL(region,
2670            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2671            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2672            0);
2673  }
2674
2675  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2676          final Put addPut, long delta) throws IOException {
2677    final long initSize = region.getDataInMemoryWithoutWAL();
2678    // save normalCPHost and replaced by mockedCPHost
2679    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2680    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2681    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2682    // do below format (from Mockito doc).
2683    Mockito.doAnswer(new Answer<Void>() {
2684      @Override
2685      public Void answer(InvocationOnMock invocation) throws Throwable {
2686        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2687        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2688        return null;
2689      }
2690    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2691    ColumnFamilyDescriptorBuilder builder = ColumnFamilyDescriptorBuilder.
2692        newBuilder(COLUMN_FAMILY_BYTES);
2693    ScanInfo info = new ScanInfo(CONF, builder.build(), Long.MAX_VALUE,
2694        Long.MAX_VALUE, region.getCellComparator());
2695    Mockito.when(mockedCPHost.preFlushScannerOpen(Mockito.any(HStore.class),
2696        Mockito.any())).thenReturn(info);
2697    Mockito.when(mockedCPHost.preFlush(Mockito.any(), Mockito.any(StoreScanner.class),
2698        Mockito.any())).thenAnswer(i -> i.getArgument(1));
2699    region.setCoprocessorHost(mockedCPHost);
2700
2701    region.put(originalPut);
2702    region.setCoprocessorHost(normalCPHost);
2703    final long finalSize = region.getDataInMemoryWithoutWAL();
2704    assertEquals("finalSize:" + finalSize + ", initSize:"
2705      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2706  }
2707
2708  @Test
2709  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2710    Delete delete = new Delete(row);
2711    delete.addColumns(fam1, qual1);
2712    doTestDelete_AndPostInsert(delete);
2713  }
2714
2715  @Test
2716  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2717    Delete delete = new Delete(row);
2718    delete.addFamily(fam1);
2719    doTestDelete_AndPostInsert(delete);
2720  }
2721
2722  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2723    this.region = initHRegion(tableName, method, CONF, fam1);
2724    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2725    Put put = new Put(row);
2726    put.addColumn(fam1, qual1, value1);
2727    region.put(put);
2728
2729    // now delete the value:
2730    region.delete(delete);
2731
2732    // ok put data:
2733    put = new Put(row);
2734    put.addColumn(fam1, qual1, value2);
2735    region.put(put);
2736
2737    // ok get:
2738    Get get = new Get(row);
2739    get.addColumn(fam1, qual1);
2740
2741    Result r = region.get(get);
2742    assertEquals(1, r.size());
2743    assertArrayEquals(value2, r.getValue(fam1, qual1));
2744
2745    // next:
2746    Scan scan = new Scan().withStartRow(row);
2747    scan.addColumn(fam1, qual1);
2748    InternalScanner s = region.getScanner(scan);
2749
2750    List<Cell> results = new ArrayList<>();
2751    assertEquals(false, s.next(results));
2752    assertEquals(1, results.size());
2753    Cell kv = results.get(0);
2754
2755    assertArrayEquals(value2, CellUtil.cloneValue(kv));
2756    assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2757    assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2758    assertArrayEquals(row, CellUtil.cloneRow(kv));
2759  }
2760
2761  @Test
2762  public void testDelete_CheckTimestampUpdated() throws IOException {
2763    byte[] row1 = Bytes.toBytes("row1");
2764    byte[] col1 = Bytes.toBytes("col1");
2765    byte[] col2 = Bytes.toBytes("col2");
2766    byte[] col3 = Bytes.toBytes("col3");
2767
2768    // Setting up region
2769    this.region = initHRegion(tableName, method, CONF, fam1);
2770    // Building checkerList
2771    List<Cell> kvs = new ArrayList<>();
2772    kvs.add(new KeyValue(row1, fam1, col1, null));
2773    kvs.add(new KeyValue(row1, fam1, col2, null));
2774    kvs.add(new KeyValue(row1, fam1, col3, null));
2775
2776    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2777    deleteMap.put(fam1, kvs);
2778    region.delete(deleteMap, Durability.SYNC_WAL);
2779
2780    // extract the key values out the memstore:
2781    // This is kinda hacky, but better than nothing...
2782    long now = System.currentTimeMillis();
2783    AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2784    Cell firstCell = memstore.getActive().first();
2785    assertTrue(firstCell.getTimestamp() <= now);
2786    now = firstCell.getTimestamp();
2787    for (Cell cell : memstore.getActive().getCellSet()) {
2788      assertTrue(cell.getTimestamp() <= now);
2789      now = cell.getTimestamp();
2790    }
2791  }
2792
2793  // ////////////////////////////////////////////////////////////////////////////
2794  // Get tests
2795  // ////////////////////////////////////////////////////////////////////////////
2796  @Test
2797  public void testGet_FamilyChecker() throws IOException {
2798    byte[] row1 = Bytes.toBytes("row1");
2799    byte[] fam1 = Bytes.toBytes("fam1");
2800    byte[] fam2 = Bytes.toBytes("False");
2801    byte[] col1 = Bytes.toBytes("col1");
2802
2803    // Setting up region
2804    this.region = initHRegion(tableName, method, CONF, fam1);
2805    Get get = new Get(row1);
2806    get.addColumn(fam2, col1);
2807
2808    // Test
2809    try {
2810      region.get(get);
2811      fail("Expecting DoNotRetryIOException in get but did not get any");
2812    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2813      LOG.info("Got expected DoNotRetryIOException successfully");
2814    }
2815  }
2816
2817  @Test
2818  public void testGet_Basic() throws IOException {
2819    byte[] row1 = Bytes.toBytes("row1");
2820    byte[] fam1 = Bytes.toBytes("fam1");
2821    byte[] col1 = Bytes.toBytes("col1");
2822    byte[] col2 = Bytes.toBytes("col2");
2823    byte[] col3 = Bytes.toBytes("col3");
2824    byte[] col4 = Bytes.toBytes("col4");
2825    byte[] col5 = Bytes.toBytes("col5");
2826
2827    // Setting up region
2828    this.region = initHRegion(tableName, method, CONF, fam1);
2829    // Add to memstore
2830    Put put = new Put(row1);
2831    put.addColumn(fam1, col1, null);
2832    put.addColumn(fam1, col2, null);
2833    put.addColumn(fam1, col3, null);
2834    put.addColumn(fam1, col4, null);
2835    put.addColumn(fam1, col5, null);
2836    region.put(put);
2837
2838    Get get = new Get(row1);
2839    get.addColumn(fam1, col2);
2840    get.addColumn(fam1, col4);
2841    // Expected result
2842    KeyValue kv1 = new KeyValue(row1, fam1, col2);
2843    KeyValue kv2 = new KeyValue(row1, fam1, col4);
2844    KeyValue[] expected = { kv1, kv2 };
2845
2846    // Test
2847    Result res = region.get(get);
2848    assertEquals(expected.length, res.size());
2849    for (int i = 0; i < res.size(); i++) {
2850      assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2851      assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2852      assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2853    }
2854
2855    // Test using a filter on a Get
2856    Get g = new Get(row1);
2857    final int count = 2;
2858    g.setFilter(new ColumnCountGetFilter(count));
2859    res = region.get(g);
2860    assertEquals(count, res.size());
2861  }
2862
2863  @Test
2864  public void testGet_Empty() throws IOException {
2865    byte[] row = Bytes.toBytes("row");
2866    byte[] fam = Bytes.toBytes("fam");
2867
2868    this.region = initHRegion(tableName, method, CONF, fam);
2869    Get get = new Get(row);
2870    get.addFamily(fam);
2871    Result r = region.get(get);
2872
2873    assertTrue(r.isEmpty());
2874  }
2875
2876  @Test
2877  public void testGetWithFilter() throws IOException, InterruptedException {
2878    byte[] row1 = Bytes.toBytes("row1");
2879    byte[] fam1 = Bytes.toBytes("fam1");
2880    byte[] col1 = Bytes.toBytes("col1");
2881    byte[] value1 = Bytes.toBytes("value1");
2882    byte[] value2 = Bytes.toBytes("value2");
2883
2884    final int maxVersions = 3;
2885    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
2886      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam1);
2887    familyDescriptor.setMaxVersions(maxVersions);
2888    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
2889      new TableDescriptorBuilder.ModifyableTableDescriptor(
2890        TableName.valueOf("testFilterAndColumnTracker"));
2891    tableDescriptor.setColumnFamily(familyDescriptor);
2892    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2893    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
2894    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2895    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2896    this.region = TEST_UTIL.createLocalHRegion(info, tableDescriptor, wal);
2897
2898    // Put 4 version to memstore
2899    long ts = 0;
2900    Put put = new Put(row1, ts);
2901    put.addColumn(fam1, col1, value1);
2902    region.put(put);
2903    put = new Put(row1, ts + 1);
2904    put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2905    region.put(put);
2906    put = new Put(row1, ts + 2);
2907    put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2908    region.put(put);
2909    put = new Put(row1, ts + 3);
2910    put.addColumn(fam1, col1, value2);
2911    region.put(put);
2912
2913    Get get = new Get(row1);
2914    get.readAllVersions();
2915    Result res = region.get(get);
2916    // Get 3 versions, the oldest version has gone from user view
2917    assertEquals(maxVersions, res.size());
2918
2919    get.setFilter(new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("value")));
2920    res = region.get(get);
2921    // When use value filter, the oldest version should still gone from user view and it
2922    // should only return one key vaule
2923    assertEquals(1, res.size());
2924    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2925    assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2926
2927    region.flush(true);
2928    region.compact(true);
2929    Thread.sleep(1000);
2930    res = region.get(get);
2931    // After flush and compact, the result should be consistent with previous result
2932    assertEquals(1, res.size());
2933    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2934  }
2935
2936  // ////////////////////////////////////////////////////////////////////////////
2937  // Scanner tests
2938  // ////////////////////////////////////////////////////////////////////////////
2939  @Test
2940  public void testGetScanner_WithOkFamilies() throws IOException {
2941    byte[] fam1 = Bytes.toBytes("fam1");
2942    byte[] fam2 = Bytes.toBytes("fam2");
2943
2944    byte[][] families = { fam1, fam2 };
2945
2946    // Setting up region
2947    this.region = initHRegion(tableName, method, CONF, families);
2948    Scan scan = new Scan();
2949    scan.addFamily(fam1);
2950    scan.addFamily(fam2);
2951    try {
2952      region.getScanner(scan);
2953    } catch (Exception e) {
2954      assertTrue("Families could not be found in Region", false);
2955    }
2956  }
2957
2958  @Test
2959  public void testGetScanner_WithNotOkFamilies() throws IOException {
2960    byte[] fam1 = Bytes.toBytes("fam1");
2961    byte[] fam2 = Bytes.toBytes("fam2");
2962
2963    byte[][] families = { fam1 };
2964
2965    // Setting up region
2966    this.region = initHRegion(tableName, method, CONF, families);
2967    Scan scan = new Scan();
2968    scan.addFamily(fam2);
2969    boolean ok = false;
2970    try {
2971      region.getScanner(scan);
2972    } catch (Exception e) {
2973      ok = true;
2974    }
2975    assertTrue("Families could not be found in Region", ok);
2976  }
2977
2978  @Test
2979  public void testGetScanner_WithNoFamilies() throws IOException {
2980    byte[] row1 = Bytes.toBytes("row1");
2981    byte[] fam1 = Bytes.toBytes("fam1");
2982    byte[] fam2 = Bytes.toBytes("fam2");
2983    byte[] fam3 = Bytes.toBytes("fam3");
2984    byte[] fam4 = Bytes.toBytes("fam4");
2985
2986    byte[][] families = { fam1, fam2, fam3, fam4 };
2987
2988    // Setting up region
2989    this.region = initHRegion(tableName, method, CONF, families);
2990    // Putting data in Region
2991    Put put = new Put(row1);
2992    put.addColumn(fam1, null, null);
2993    put.addColumn(fam2, null, null);
2994    put.addColumn(fam3, null, null);
2995    put.addColumn(fam4, null, null);
2996    region.put(put);
2997
2998    Scan scan = null;
2999    HRegion.RegionScannerImpl is = null;
3000
3001    // Testing to see how many scanners that is produced by getScanner,
3002    // starting
3003    // with known number, 2 - current = 1
3004    scan = new Scan();
3005    scan.addFamily(fam2);
3006    scan.addFamily(fam4);
3007    is = region.getScanner(scan);
3008    assertEquals(1, is.storeHeap.getHeap().size());
3009
3010    scan = new Scan();
3011    is = region.getScanner(scan);
3012    assertEquals(families.length - 1, is.storeHeap.getHeap().size());
3013  }
3014
3015  /**
3016   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
3017   *
3018   * @throws IOException
3019   */
3020  @Test
3021  public void testGetScanner_WithRegionClosed() throws IOException {
3022    byte[] fam1 = Bytes.toBytes("fam1");
3023    byte[] fam2 = Bytes.toBytes("fam2");
3024
3025    byte[][] families = { fam1, fam2 };
3026
3027    // Setting up region
3028    try {
3029      this.region = initHRegion(tableName, method, CONF, families);
3030    } catch (IOException e) {
3031      e.printStackTrace();
3032      fail("Got IOException during initHRegion, " + e.getMessage());
3033    }
3034    region.closed.set(true);
3035    try {
3036      region.getScanner(null);
3037      fail("Expected to get an exception during getScanner on a region that is closed");
3038    } catch (NotServingRegionException e) {
3039      // this is the correct exception that is expected
3040    } catch (IOException e) {
3041      fail("Got wrong type of exception - should be a NotServingRegionException, " +
3042          "but was an IOException: "
3043          + e.getMessage());
3044    }
3045  }
3046
3047  @Test
3048  public void testRegionScanner_Next() throws IOException {
3049    byte[] row1 = Bytes.toBytes("row1");
3050    byte[] row2 = Bytes.toBytes("row2");
3051    byte[] fam1 = Bytes.toBytes("fam1");
3052    byte[] fam2 = Bytes.toBytes("fam2");
3053    byte[] fam3 = Bytes.toBytes("fam3");
3054    byte[] fam4 = Bytes.toBytes("fam4");
3055
3056    byte[][] families = { fam1, fam2, fam3, fam4 };
3057    long ts = System.currentTimeMillis();
3058
3059    // Setting up region
3060    this.region = initHRegion(tableName, method, CONF, families);
3061    // Putting data in Region
3062    Put put = null;
3063    put = new Put(row1);
3064    put.addColumn(fam1, (byte[]) null, ts, null);
3065    put.addColumn(fam2, (byte[]) null, ts, null);
3066    put.addColumn(fam3, (byte[]) null, ts, null);
3067    put.addColumn(fam4, (byte[]) null, ts, null);
3068    region.put(put);
3069
3070    put = new Put(row2);
3071    put.addColumn(fam1, (byte[]) null, ts, null);
3072    put.addColumn(fam2, (byte[]) null, ts, null);
3073    put.addColumn(fam3, (byte[]) null, ts, null);
3074    put.addColumn(fam4, (byte[]) null, ts, null);
3075    region.put(put);
3076
3077    Scan scan = new Scan();
3078    scan.addFamily(fam2);
3079    scan.addFamily(fam4);
3080    InternalScanner is = region.getScanner(scan);
3081
3082    List<Cell> res = null;
3083
3084    // Result 1
3085    List<Cell> expected1 = new ArrayList<>();
3086    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
3087    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
3088
3089    res = new ArrayList<>();
3090    is.next(res);
3091    for (int i = 0; i < res.size(); i++) {
3092      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
3093    }
3094
3095    // Result 2
3096    List<Cell> expected2 = new ArrayList<>();
3097    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
3098    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
3099
3100    res = new ArrayList<>();
3101    is.next(res);
3102    for (int i = 0; i < res.size(); i++) {
3103      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
3104    }
3105  }
3106
3107  @Test
3108  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
3109    byte[] row1 = Bytes.toBytes("row1");
3110    byte[] qf1 = Bytes.toBytes("qualifier1");
3111    byte[] qf2 = Bytes.toBytes("qualifier2");
3112    byte[] fam1 = Bytes.toBytes("fam1");
3113    byte[][] families = { fam1 };
3114
3115    long ts1 = System.currentTimeMillis();
3116    long ts2 = ts1 + 1;
3117    long ts3 = ts1 + 2;
3118
3119    // Setting up region
3120    this.region = initHRegion(tableName, method, CONF, families);
3121    // Putting data in Region
3122    Put put = null;
3123    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3124    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3125    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3126
3127    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3128    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3129    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3130
3131    put = new Put(row1);
3132    put.add(kv13);
3133    put.add(kv12);
3134    put.add(kv11);
3135    put.add(kv23);
3136    put.add(kv22);
3137    put.add(kv21);
3138    region.put(put);
3139
3140    // Expected
3141    List<Cell> expected = new ArrayList<>();
3142    expected.add(kv13);
3143    expected.add(kv12);
3144
3145    Scan scan = new Scan().withStartRow(row1);
3146    scan.addColumn(fam1, qf1);
3147    scan.readVersions(MAX_VERSIONS);
3148    List<Cell> actual = new ArrayList<>();
3149    InternalScanner scanner = region.getScanner(scan);
3150
3151    boolean hasNext = scanner.next(actual);
3152    assertEquals(false, hasNext);
3153
3154    // Verify result
3155    for (int i = 0; i < expected.size(); i++) {
3156      assertEquals(expected.get(i), actual.get(i));
3157    }
3158  }
3159
3160  @Test
3161  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
3162    byte[] row1 = Bytes.toBytes("row1");
3163    byte[] qf1 = Bytes.toBytes("qualifier1");
3164    byte[] qf2 = Bytes.toBytes("qualifier2");
3165    byte[] fam1 = Bytes.toBytes("fam1");
3166    byte[][] families = { fam1 };
3167
3168    long ts1 = 1; // System.currentTimeMillis();
3169    long ts2 = ts1 + 1;
3170    long ts3 = ts1 + 2;
3171
3172    // Setting up region
3173    this.region = initHRegion(tableName, method, CONF, families);
3174    // Putting data in Region
3175    Put put = null;
3176    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3177    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3178    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3179
3180    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3181    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3182    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3183
3184    put = new Put(row1);
3185    put.add(kv13);
3186    put.add(kv12);
3187    put.add(kv11);
3188    put.add(kv23);
3189    put.add(kv22);
3190    put.add(kv21);
3191    region.put(put);
3192    region.flush(true);
3193
3194    // Expected
3195    List<Cell> expected = new ArrayList<>();
3196    expected.add(kv13);
3197    expected.add(kv12);
3198    expected.add(kv23);
3199    expected.add(kv22);
3200
3201    Scan scan = new Scan().withStartRow(row1);
3202    scan.addColumn(fam1, qf1);
3203    scan.addColumn(fam1, qf2);
3204    scan.readVersions(MAX_VERSIONS);
3205    List<Cell> actual = new ArrayList<>();
3206    InternalScanner scanner = region.getScanner(scan);
3207
3208    boolean hasNext = scanner.next(actual);
3209    assertEquals(false, hasNext);
3210
3211    // Verify result
3212    for (int i = 0; i < expected.size(); i++) {
3213      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3214    }
3215  }
3216
3217  @Test
3218  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
3219      IOException {
3220    byte[] row1 = Bytes.toBytes("row1");
3221    byte[] fam1 = Bytes.toBytes("fam1");
3222    byte[][] families = { fam1 };
3223    byte[] qf1 = Bytes.toBytes("qualifier1");
3224    byte[] qf2 = Bytes.toBytes("qualifier2");
3225
3226    long ts1 = 1;
3227    long ts2 = ts1 + 1;
3228    long ts3 = ts1 + 2;
3229    long ts4 = ts1 + 3;
3230
3231    // Setting up region
3232    this.region = initHRegion(tableName, method, CONF, families);
3233    // Putting data in Region
3234    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3235    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3236    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3237    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3238
3239    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3240    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3241    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3242    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3243
3244    Put put = null;
3245    put = new Put(row1);
3246    put.add(kv14);
3247    put.add(kv24);
3248    region.put(put);
3249    region.flush(true);
3250
3251    put = new Put(row1);
3252    put.add(kv23);
3253    put.add(kv13);
3254    region.put(put);
3255    region.flush(true);
3256
3257    put = new Put(row1);
3258    put.add(kv22);
3259    put.add(kv12);
3260    region.put(put);
3261    region.flush(true);
3262
3263    put = new Put(row1);
3264    put.add(kv21);
3265    put.add(kv11);
3266    region.put(put);
3267
3268    // Expected
3269    List<Cell> expected = new ArrayList<>();
3270    expected.add(kv14);
3271    expected.add(kv13);
3272    expected.add(kv12);
3273    expected.add(kv24);
3274    expected.add(kv23);
3275    expected.add(kv22);
3276
3277    Scan scan = new Scan().withStartRow(row1);
3278    scan.addColumn(fam1, qf1);
3279    scan.addColumn(fam1, qf2);
3280    int versions = 3;
3281    scan.readVersions(versions);
3282    List<Cell> actual = new ArrayList<>();
3283    InternalScanner scanner = region.getScanner(scan);
3284
3285    boolean hasNext = scanner.next(actual);
3286    assertEquals(false, hasNext);
3287
3288    // Verify result
3289    for (int i = 0; i < expected.size(); i++) {
3290      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3291    }
3292  }
3293
3294  @Test
3295  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3296    byte[] row1 = Bytes.toBytes("row1");
3297    byte[] qf1 = Bytes.toBytes("qualifier1");
3298    byte[] qf2 = Bytes.toBytes("qualifier2");
3299    byte[] fam1 = Bytes.toBytes("fam1");
3300    byte[][] families = { fam1 };
3301
3302    long ts1 = System.currentTimeMillis();
3303    long ts2 = ts1 + 1;
3304    long ts3 = ts1 + 2;
3305
3306    // Setting up region
3307    this.region = initHRegion(tableName, method, CONF, families);
3308    // Putting data in Region
3309    Put put = null;
3310    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3311    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3312    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3313
3314    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3315    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3316    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3317
3318    put = new Put(row1);
3319    put.add(kv13);
3320    put.add(kv12);
3321    put.add(kv11);
3322    put.add(kv23);
3323    put.add(kv22);
3324    put.add(kv21);
3325    region.put(put);
3326
3327    // Expected
3328    List<Cell> expected = new ArrayList<>();
3329    expected.add(kv13);
3330    expected.add(kv12);
3331    expected.add(kv23);
3332    expected.add(kv22);
3333
3334    Scan scan = new Scan().withStartRow(row1);
3335    scan.addFamily(fam1);
3336    scan.readVersions(MAX_VERSIONS);
3337    List<Cell> actual = new ArrayList<>();
3338    InternalScanner scanner = region.getScanner(scan);
3339
3340    boolean hasNext = scanner.next(actual);
3341    assertEquals(false, hasNext);
3342
3343    // Verify result
3344    for (int i = 0; i < expected.size(); i++) {
3345      assertEquals(expected.get(i), actual.get(i));
3346    }
3347  }
3348
3349  @Test
3350  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3351    byte[] row1 = Bytes.toBytes("row1");
3352    byte[] qf1 = Bytes.toBytes("qualifier1");
3353    byte[] qf2 = Bytes.toBytes("qualifier2");
3354    byte[] fam1 = Bytes.toBytes("fam1");
3355
3356    long ts1 = 1; // System.currentTimeMillis();
3357    long ts2 = ts1 + 1;
3358    long ts3 = ts1 + 2;
3359
3360    // Setting up region
3361    this.region = initHRegion(tableName, method, CONF, fam1);
3362    // Putting data in Region
3363    Put put = null;
3364    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3365    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3366    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3367
3368    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3369    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3370    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3371
3372    put = new Put(row1);
3373    put.add(kv13);
3374    put.add(kv12);
3375    put.add(kv11);
3376    put.add(kv23);
3377    put.add(kv22);
3378    put.add(kv21);
3379    region.put(put);
3380    region.flush(true);
3381
3382    // Expected
3383    List<Cell> expected = new ArrayList<>();
3384    expected.add(kv13);
3385    expected.add(kv12);
3386    expected.add(kv23);
3387    expected.add(kv22);
3388
3389    Scan scan = new Scan().withStartRow(row1);
3390    scan.addFamily(fam1);
3391    scan.readVersions(MAX_VERSIONS);
3392    List<Cell> actual = new ArrayList<>();
3393    InternalScanner scanner = region.getScanner(scan);
3394
3395    boolean hasNext = scanner.next(actual);
3396    assertEquals(false, hasNext);
3397
3398    // Verify result
3399    for (int i = 0; i < expected.size(); i++) {
3400      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3401    }
3402  }
3403
3404  @Test
3405  public void testScanner_StopRow1542() throws IOException {
3406    byte[] family = Bytes.toBytes("testFamily");
3407    this.region = initHRegion(tableName, method, CONF, family);
3408    byte[] row1 = Bytes.toBytes("row111");
3409    byte[] row2 = Bytes.toBytes("row222");
3410    byte[] row3 = Bytes.toBytes("row333");
3411    byte[] row4 = Bytes.toBytes("row444");
3412    byte[] row5 = Bytes.toBytes("row555");
3413
3414    byte[] col1 = Bytes.toBytes("Pub111");
3415    byte[] col2 = Bytes.toBytes("Pub222");
3416
3417    Put put = new Put(row1);
3418    put.addColumn(family, col1, Bytes.toBytes(10L));
3419    region.put(put);
3420
3421    put = new Put(row2);
3422    put.addColumn(family, col1, Bytes.toBytes(15L));
3423    region.put(put);
3424
3425    put = new Put(row3);
3426    put.addColumn(family, col2, Bytes.toBytes(20L));
3427    region.put(put);
3428
3429    put = new Put(row4);
3430    put.addColumn(family, col2, Bytes.toBytes(30L));
3431    region.put(put);
3432
3433    put = new Put(row5);
3434    put.addColumn(family, col1, Bytes.toBytes(40L));
3435    region.put(put);
3436
3437    Scan scan = new Scan().withStartRow(row3).withStopRow(row4);
3438    scan.readAllVersions();
3439    scan.addColumn(family, col1);
3440    InternalScanner s = region.getScanner(scan);
3441
3442    List<Cell> results = new ArrayList<>();
3443    assertEquals(false, s.next(results));
3444    assertEquals(0, results.size());
3445  }
3446
3447  @Test
3448  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3449    byte[] row1 = Bytes.toBytes("row1");
3450    byte[] fam1 = Bytes.toBytes("fam1");
3451    byte[] qf1 = Bytes.toBytes("qualifier1");
3452    byte[] qf2 = Bytes.toBytes("quateslifier2");
3453
3454    long ts1 = 1;
3455    long ts2 = ts1 + 1;
3456    long ts3 = ts1 + 2;
3457    long ts4 = ts1 + 3;
3458
3459    // Setting up region
3460    this.region = initHRegion(tableName, method, CONF, fam1);
3461    // Putting data in Region
3462    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3463    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3464    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3465    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3466
3467    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3468    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3469    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3470    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3471
3472    Put put = null;
3473    put = new Put(row1);
3474    put.add(kv14);
3475    put.add(kv24);
3476    region.put(put);
3477    region.flush(true);
3478
3479    put = new Put(row1);
3480    put.add(kv23);
3481    put.add(kv13);
3482    region.put(put);
3483    region.flush(true);
3484
3485    put = new Put(row1);
3486    put.add(kv22);
3487    put.add(kv12);
3488    region.put(put);
3489    region.flush(true);
3490
3491    put = new Put(row1);
3492    put.add(kv21);
3493    put.add(kv11);
3494    region.put(put);
3495
3496    // Expected
3497    List<KeyValue> expected = new ArrayList<>();
3498    expected.add(kv14);
3499    expected.add(kv13);
3500    expected.add(kv12);
3501    expected.add(kv24);
3502    expected.add(kv23);
3503    expected.add(kv22);
3504
3505    Scan scan = new Scan().withStartRow(row1);
3506    int versions = 3;
3507    scan.readVersions(versions);
3508    List<Cell> actual = new ArrayList<>();
3509    InternalScanner scanner = region.getScanner(scan);
3510
3511    boolean hasNext = scanner.next(actual);
3512    assertEquals(false, hasNext);
3513
3514    // Verify result
3515    for (int i = 0; i < expected.size(); i++) {
3516      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3517    }
3518  }
3519
3520  /**
3521   * Added for HBASE-5416
3522   *
3523   * Here we test scan optimization when only subset of CFs are used in filter
3524   * conditions.
3525   */
3526  @Test
3527  public void testScanner_JoinedScanners() throws IOException {
3528    byte[] cf_essential = Bytes.toBytes("essential");
3529    byte[] cf_joined = Bytes.toBytes("joined");
3530    byte[] cf_alpha = Bytes.toBytes("alpha");
3531    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3532    byte[] row1 = Bytes.toBytes("row1");
3533    byte[] row2 = Bytes.toBytes("row2");
3534    byte[] row3 = Bytes.toBytes("row3");
3535
3536    byte[] col_normal = Bytes.toBytes("d");
3537    byte[] col_alpha = Bytes.toBytes("a");
3538
3539    byte[] filtered_val = Bytes.toBytes(3);
3540
3541    Put put = new Put(row1);
3542    put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3543    put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3544    region.put(put);
3545
3546    put = new Put(row2);
3547    put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3548    put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3549    put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3550    region.put(put);
3551
3552    put = new Put(row3);
3553    put.addColumn(cf_essential, col_normal, filtered_val);
3554    put.addColumn(cf_joined, col_normal, filtered_val);
3555    region.put(put);
3556
3557    // Check two things:
3558    // 1. result list contains expected values
3559    // 2. result list is sorted properly
3560
3561    Scan scan = new Scan();
3562    Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3563            CompareOperator.NOT_EQUAL, filtered_val);
3564    scan.setFilter(filter);
3565    scan.setLoadColumnFamiliesOnDemand(true);
3566    InternalScanner s = region.getScanner(scan);
3567
3568    List<Cell> results = new ArrayList<>();
3569    assertTrue(s.next(results));
3570    assertEquals(1, results.size());
3571    results.clear();
3572
3573    assertTrue(s.next(results));
3574    assertEquals(3, results.size());
3575    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3576    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3577    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3578    results.clear();
3579
3580    assertFalse(s.next(results));
3581    assertEquals(0, results.size());
3582  }
3583
3584  /**
3585   * HBASE-5416
3586   *
3587   * Test case when scan limits amount of KVs returned on each next() call.
3588   */
3589  @Test
3590  public void testScanner_JoinedScannersWithLimits() throws IOException {
3591    final byte[] cf_first = Bytes.toBytes("first");
3592    final byte[] cf_second = Bytes.toBytes("second");
3593
3594    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3595    final byte[] col_a = Bytes.toBytes("a");
3596    final byte[] col_b = Bytes.toBytes("b");
3597
3598    Put put;
3599
3600    for (int i = 0; i < 10; i++) {
3601      put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3602      put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3603      if (i < 5) {
3604        put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3605        put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3606        put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3607      }
3608      region.put(put);
3609    }
3610
3611    Scan scan = new Scan();
3612    scan.setLoadColumnFamiliesOnDemand(true);
3613    Filter bogusFilter = new FilterBase() {
3614      @Override
3615      public ReturnCode filterCell(final Cell ignored) throws IOException {
3616        return ReturnCode.INCLUDE;
3617      }
3618      @Override
3619      public boolean isFamilyEssential(byte[] name) {
3620        return Bytes.equals(name, cf_first);
3621      }
3622    };
3623
3624    scan.setFilter(bogusFilter);
3625    InternalScanner s = region.getScanner(scan);
3626
3627    // Our data looks like this:
3628    // r0: first:a, first:b, second:a, second:b
3629    // r1: first:a, first:b, second:a, second:b
3630    // r2: first:a, first:b, second:a, second:b
3631    // r3: first:a, first:b, second:a, second:b
3632    // r4: first:a, first:b, second:a, second:b
3633    // r5: first:a
3634    // r6: first:a
3635    // r7: first:a
3636    // r8: first:a
3637    // r9: first:a
3638
3639    // But due to next's limit set to 3, we should get this:
3640    // r0: first:a, first:b, second:a
3641    // r0: second:b
3642    // r1: first:a, first:b, second:a
3643    // r1: second:b
3644    // r2: first:a, first:b, second:a
3645    // r2: second:b
3646    // r3: first:a, first:b, second:a
3647    // r3: second:b
3648    // r4: first:a, first:b, second:a
3649    // r4: second:b
3650    // r5: first:a
3651    // r6: first:a
3652    // r7: first:a
3653    // r8: first:a
3654    // r9: first:a
3655
3656    List<Cell> results = new ArrayList<>();
3657    int index = 0;
3658    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3659    while (true) {
3660      boolean more = s.next(results, scannerContext);
3661      if ((index >> 1) < 5) {
3662        if (index % 2 == 0) {
3663          assertEquals(3, results.size());
3664        } else {
3665          assertEquals(1, results.size());
3666        }
3667      } else {
3668        assertEquals(1, results.size());
3669      }
3670      results.clear();
3671      index++;
3672      if (!more) {
3673        break;
3674      }
3675    }
3676  }
3677
3678  /**
3679   * Write an HFile block full with Cells whose qualifier that are identical between
3680   * 0 and Short.MAX_VALUE. See HBASE-13329.
3681   * @throws Exception
3682   */
3683  @Test
3684  public void testLongQualifier() throws Exception {
3685    byte[] family = Bytes.toBytes("family");
3686    this.region = initHRegion(tableName, method, CONF, family);
3687    byte[] q = new byte[Short.MAX_VALUE+2];
3688    Arrays.fill(q, 0, q.length-1, (byte)42);
3689    for (byte i=0; i<10; i++) {
3690      Put p = new Put(Bytes.toBytes("row"));
3691      // qualifiers that differ past Short.MAX_VALUE
3692      q[q.length-1]=i;
3693      p.addColumn(family, q, q);
3694      region.put(p);
3695    }
3696    region.flush(false);
3697  }
3698
3699  /**
3700   * Flushes the cache in a thread while scanning. The tests verify that the
3701   * scan is coherent - e.g. the returned results are always of the same or
3702   * later update as the previous results.
3703   *
3704   * @throws IOException
3705   *           scan / compact
3706   * @throws InterruptedException
3707   *           thread join
3708   */
3709  @Test
3710  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3711    byte[] family = Bytes.toBytes("family");
3712    int numRows = 1000;
3713    int flushAndScanInterval = 10;
3714    int compactInterval = 10 * flushAndScanInterval;
3715
3716    this.region = initHRegion(tableName, method, CONF, family);
3717    FlushThread flushThread = new FlushThread();
3718    try {
3719      flushThread.start();
3720
3721      Scan scan = new Scan();
3722      scan.addFamily(family);
3723      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOperator.EQUAL,
3724          new BinaryComparator(Bytes.toBytes(5L))));
3725
3726      int expectedCount = 0;
3727      List<Cell> res = new ArrayList<>();
3728
3729      boolean toggle = true;
3730      for (long i = 0; i < numRows; i++) {
3731        Put put = new Put(Bytes.toBytes(i));
3732        put.setDurability(Durability.SKIP_WAL);
3733        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3734        region.put(put);
3735
3736        if (i != 0 && i % compactInterval == 0) {
3737          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3738          region.compact(true);
3739        }
3740
3741        if (i % 10 == 5L) {
3742          expectedCount++;
3743        }
3744
3745        if (i != 0 && i % flushAndScanInterval == 0) {
3746          res.clear();
3747          InternalScanner scanner = region.getScanner(scan);
3748          if (toggle) {
3749            flushThread.flush();
3750          }
3751          while (scanner.next(res))
3752            ;
3753          if (!toggle) {
3754            flushThread.flush();
3755          }
3756          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3757              expectedCount, res.size());
3758          toggle = !toggle;
3759        }
3760      }
3761
3762    } finally {
3763      try {
3764        flushThread.done();
3765        flushThread.join();
3766        flushThread.checkNoError();
3767      } catch (InterruptedException ie) {
3768        LOG.warn("Caught exception when joining with flushThread", ie);
3769      }
3770      HBaseTestingUtility.closeRegionAndWAL(this.region);
3771      this.region = null;
3772    }
3773  }
3774
3775  protected class FlushThread extends Thread {
3776    private volatile boolean done;
3777    private Throwable error = null;
3778
3779    FlushThread() {
3780      super("FlushThread");
3781    }
3782
3783    public void done() {
3784      done = true;
3785      synchronized (this) {
3786        interrupt();
3787      }
3788    }
3789
3790    public void checkNoError() {
3791      if (error != null) {
3792        assertNull(error);
3793      }
3794    }
3795
3796    @Override
3797    public void run() {
3798      done = false;
3799      while (!done) {
3800        synchronized (this) {
3801          try {
3802            wait();
3803          } catch (InterruptedException ignored) {
3804            if (done) {
3805              break;
3806            }
3807          }
3808        }
3809        try {
3810          region.flush(true);
3811        } catch (IOException e) {
3812          if (!done) {
3813            LOG.error("Error while flushing cache", e);
3814            error = e;
3815          }
3816          break;
3817        } catch (Throwable t) {
3818          LOG.error("Uncaught exception", t);
3819          throw t;
3820        }
3821      }
3822    }
3823
3824    public void flush() {
3825      synchronized (this) {
3826        notify();
3827      }
3828    }
3829  }
3830
3831  /**
3832   * So can be overridden in subclasses.
3833   */
3834  int getNumQualifiersForTestWritesWhileScanning() {
3835    return 100;
3836  }
3837
3838  /**
3839   * So can be overridden in subclasses.
3840   */
3841  int getTestCountForTestWritesWhileScanning() {
3842    return 100;
3843  }
3844
3845  /**
3846   * Writes very wide records and scans for the latest every time.. Flushes and
3847   * compacts the region every now and then to keep things realistic.
3848   *
3849   * @throws IOException
3850   *           by flush / scan / compaction
3851   * @throws InterruptedException
3852   *           when joining threads
3853   */
3854  @Test
3855  public void testWritesWhileScanning() throws IOException, InterruptedException {
3856    int testCount = getTestCountForTestWritesWhileScanning();
3857    int numRows = 1;
3858    int numFamilies = 10;
3859    int numQualifiers = getNumQualifiersForTestWritesWhileScanning();
3860    int flushInterval = 7;
3861    int compactInterval = 5 * flushInterval;
3862    byte[][] families = new byte[numFamilies][];
3863    for (int i = 0; i < numFamilies; i++) {
3864      families[i] = Bytes.toBytes("family" + i);
3865    }
3866    byte[][] qualifiers = new byte[numQualifiers][];
3867    for (int i = 0; i < numQualifiers; i++) {
3868      qualifiers[i] = Bytes.toBytes("qual" + i);
3869    }
3870
3871    this.region = initHRegion(tableName, method, CONF, families);
3872    FlushThread flushThread = new FlushThread();
3873    PutThread putThread = new PutThread(numRows, families, qualifiers);
3874    try {
3875      putThread.start();
3876      putThread.waitForFirstPut();
3877
3878      flushThread.start();
3879
3880      Scan scan = new Scan().withStartRow(Bytes.toBytes("row0"))
3881        .withStopRow(Bytes.toBytes("row1"));
3882
3883      int expectedCount = numFamilies * numQualifiers;
3884      List<Cell> res = new ArrayList<>();
3885
3886      long prevTimestamp = 0L;
3887      for (int i = 0; i < testCount; i++) {
3888
3889        if (i != 0 && i % compactInterval == 0) {
3890          region.compact(true);
3891          for (HStore store : region.getStores()) {
3892            store.closeAndArchiveCompactedFiles();
3893          }
3894        }
3895
3896        if (i != 0 && i % flushInterval == 0) {
3897          flushThread.flush();
3898        }
3899
3900        boolean previousEmpty = res.isEmpty();
3901        res.clear();
3902        try (InternalScanner scanner = region.getScanner(scan)) {
3903          boolean moreRows;
3904          do {
3905            moreRows = scanner.next(res);
3906          } while (moreRows);
3907        }
3908        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3909          assertEquals("i=" + i, expectedCount, res.size());
3910          long timestamp = res.get(0).getTimestamp();
3911          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3912              timestamp >= prevTimestamp);
3913          prevTimestamp = timestamp;
3914        }
3915      }
3916
3917      putThread.done();
3918
3919      region.flush(true);
3920
3921    } finally {
3922      try {
3923        flushThread.done();
3924        flushThread.join();
3925        flushThread.checkNoError();
3926
3927        putThread.join();
3928        putThread.checkNoError();
3929      } catch (InterruptedException ie) {
3930        LOG.warn("Caught exception when joining with flushThread", ie);
3931      }
3932
3933      try {
3934          HBaseTestingUtility.closeRegionAndWAL(this.region);
3935      } catch (DroppedSnapshotException dse) {
3936        // We could get this on way out because we interrupt the background flusher and it could
3937        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3938        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3939        // flush because the accounting is 'off' since original DSE.
3940      }
3941      this.region = null;
3942    }
3943  }
3944
3945  protected class PutThread extends Thread {
3946    private volatile boolean done;
3947    private volatile int numPutsFinished = 0;
3948
3949    private Throwable error = null;
3950    private int numRows;
3951    private byte[][] families;
3952    private byte[][] qualifiers;
3953
3954    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3955      super("PutThread");
3956      this.numRows = numRows;
3957      this.families = families;
3958      this.qualifiers = qualifiers;
3959    }
3960
3961    /**
3962     * Block calling thread until this instance of PutThread has put at least one row.
3963     */
3964    public void waitForFirstPut() throws InterruptedException {
3965      // wait until put thread actually puts some data
3966      while (isAlive() && numPutsFinished == 0) {
3967        checkNoError();
3968        Thread.sleep(50);
3969      }
3970    }
3971
3972    public void done() {
3973      done = true;
3974      synchronized (this) {
3975        interrupt();
3976      }
3977    }
3978
3979    public void checkNoError() {
3980      if (error != null) {
3981        assertNull(error);
3982      }
3983    }
3984
3985    @Override
3986    public void run() {
3987      done = false;
3988      while (!done) {
3989        try {
3990          for (int r = 0; r < numRows; r++) {
3991            byte[] row = Bytes.toBytes("row" + r);
3992            Put put = new Put(row);
3993            put.setDurability(Durability.SKIP_WAL);
3994            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3995            for (byte[] family : families) {
3996              for (byte[] qualifier : qualifiers) {
3997                put.addColumn(family, qualifier, numPutsFinished, value);
3998              }
3999            }
4000            region.put(put);
4001            numPutsFinished++;
4002            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
4003              LOG.debug("put iteration = {}", numPutsFinished);
4004              Delete delete = new Delete(row, (long) numPutsFinished - 30);
4005              region.delete(delete);
4006            }
4007            numPutsFinished++;
4008          }
4009        } catch (InterruptedIOException e) {
4010          // This is fine. It means we are done, or didn't get the lock on time
4011          LOG.info("Interrupted", e);
4012        } catch (IOException e) {
4013          LOG.error("Error while putting records", e);
4014          error = e;
4015          break;
4016        }
4017      }
4018
4019    }
4020
4021  }
4022
4023  /**
4024   * Writes very wide records and gets the latest row every time.. Flushes and
4025   * compacts the region aggressivly to catch issues.
4026   *
4027   * @throws IOException
4028   *           by flush / scan / compaction
4029   * @throws InterruptedException
4030   *           when joining threads
4031   */
4032  @Test
4033  public void testWritesWhileGetting() throws Exception {
4034    int testCount = 50;
4035    int numRows = 1;
4036    int numFamilies = 10;
4037    int numQualifiers = 100;
4038    int compactInterval = 100;
4039    byte[][] families = new byte[numFamilies][];
4040    for (int i = 0; i < numFamilies; i++) {
4041      families[i] = Bytes.toBytes("family" + i);
4042    }
4043    byte[][] qualifiers = new byte[numQualifiers][];
4044    for (int i = 0; i < numQualifiers; i++) {
4045      qualifiers[i] = Bytes.toBytes("qual" + i);
4046    }
4047
4048
4049    // This test flushes constantly and can cause many files to be created,
4050    // possibly
4051    // extending over the ulimit. Make sure compactions are aggressive in
4052    // reducing
4053    // the number of HFiles created.
4054    Configuration conf = HBaseConfiguration.create(CONF);
4055    conf.setInt("hbase.hstore.compaction.min", 1);
4056    conf.setInt("hbase.hstore.compaction.max", 1000);
4057    this.region = initHRegion(tableName, method, conf, families);
4058    PutThread putThread = null;
4059    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
4060    try {
4061      putThread = new PutThread(numRows, families, qualifiers);
4062      putThread.start();
4063      putThread.waitForFirstPut();
4064
4065      // Add a thread that flushes as fast as possible
4066      ctx.addThread(new RepeatingTestThread(ctx) {
4067
4068        @Override
4069        public void doAnAction() throws Exception {
4070          region.flush(true);
4071          // Compact regularly to avoid creating too many files and exceeding
4072          // the ulimit.
4073          region.compact(false);
4074          for (HStore store : region.getStores()) {
4075            store.closeAndArchiveCompactedFiles();
4076          }
4077        }
4078      });
4079      ctx.startThreads();
4080
4081      Get get = new Get(Bytes.toBytes("row0"));
4082      Result result = null;
4083
4084      int expectedCount = numFamilies * numQualifiers;
4085
4086      long prevTimestamp = 0L;
4087      for (int i = 0; i < testCount; i++) {
4088        LOG.info("testWritesWhileGetting verify turn " + i);
4089        boolean previousEmpty = result == null || result.isEmpty();
4090        result = region.get(get);
4091        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4092          assertEquals("i=" + i, expectedCount, result.size());
4093          // TODO this was removed, now what dangit?!
4094          // search looking for the qualifier in question?
4095          long timestamp = 0;
4096          for (Cell kv : result.rawCells()) {
4097            if (CellUtil.matchingFamily(kv, families[0])
4098                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4099              timestamp = kv.getTimestamp();
4100            }
4101          }
4102          assertTrue(timestamp >= prevTimestamp);
4103          prevTimestamp = timestamp;
4104          Cell previousKV = null;
4105
4106          for (Cell kv : result.rawCells()) {
4107            byte[] thisValue = CellUtil.cloneValue(kv);
4108            if (previousKV != null) {
4109              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4110                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4111                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
4112                    + "(memStoreTS:" + kv.getSequenceId() + ")");
4113                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4114              }
4115            }
4116            previousKV = kv;
4117          }
4118        }
4119      }
4120    } finally {
4121      if (putThread != null)
4122        putThread.done();
4123
4124      region.flush(true);
4125
4126      if (putThread != null) {
4127        putThread.join();
4128        putThread.checkNoError();
4129      }
4130
4131      ctx.stop();
4132      HBaseTestingUtility.closeRegionAndWAL(this.region);
4133      this.region = null;
4134    }
4135  }
4136
4137  @Test
4138  public void testHolesInMeta() throws Exception {
4139    byte[] family = Bytes.toBytes("family");
4140    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4141        false, family);
4142    byte[] rowNotServed = Bytes.toBytes("a");
4143    Get g = new Get(rowNotServed);
4144    try {
4145      region.get(g);
4146      fail();
4147    } catch (WrongRegionException x) {
4148      // OK
4149    }
4150    byte[] row = Bytes.toBytes("y");
4151    g = new Get(row);
4152    region.get(g);
4153  }
4154
4155  @Test
4156  public void testIndexesScanWithOneDeletedRow() throws IOException {
4157    byte[] family = Bytes.toBytes("family");
4158
4159    // Setting up region
4160    this.region = initHRegion(tableName, method, CONF, family);
4161    Put put = new Put(Bytes.toBytes(1L));
4162    put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
4163    region.put(put);
4164
4165    region.flush(true);
4166
4167    Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4168    region.delete(delete);
4169
4170    put = new Put(Bytes.toBytes(2L));
4171    put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
4172    region.put(put);
4173
4174    Scan idxScan = new Scan();
4175    idxScan.addFamily(family);
4176    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4177        new SingleColumnValueFilter(family, qual1, CompareOperator.GREATER_OR_EQUAL,
4178            new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4179                    CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4180    InternalScanner scanner = region.getScanner(idxScan);
4181    List<Cell> res = new ArrayList<>();
4182
4183    while (scanner.next(res)) {
4184      // Ignore res value.
4185    }
4186    assertEquals(1L, res.size());
4187  }
4188
4189  // ////////////////////////////////////////////////////////////////////////////
4190  // Bloom filter test
4191  // ////////////////////////////////////////////////////////////////////////////
4192  @Test
4193  public void testBloomFilterSize() throws IOException {
4194    byte[] fam1 = Bytes.toBytes("fam1");
4195    byte[] qf1 = Bytes.toBytes("col");
4196    byte[] val1 = Bytes.toBytes("value1");
4197    // Create Table
4198    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
4199      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam1)
4200        .setMaxVersions(Integer.MAX_VALUE)
4201        .setBloomFilterType(BloomType.ROWCOL);
4202
4203    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
4204      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
4205    tableDescriptor.setColumnFamily(familyDescriptor);
4206    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
4207    this.region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
4208    int num_unique_rows = 10;
4209    int duplicate_multiplier = 2;
4210    int num_storefiles = 4;
4211
4212    int version = 0;
4213    for (int f = 0; f < num_storefiles; f++) {
4214      for (int i = 0; i < duplicate_multiplier; i++) {
4215        for (int j = 0; j < num_unique_rows; j++) {
4216          Put put = new Put(Bytes.toBytes("row" + j));
4217          put.setDurability(Durability.SKIP_WAL);
4218          long ts = version++;
4219          put.addColumn(fam1, qf1, ts, val1);
4220          region.put(put);
4221        }
4222      }
4223      region.flush(true);
4224    }
4225    // before compaction
4226    HStore store = region.getStore(fam1);
4227    Collection<HStoreFile> storeFiles = store.getStorefiles();
4228    for (HStoreFile storefile : storeFiles) {
4229      StoreFileReader reader = storefile.getReader();
4230      reader.loadFileInfo();
4231      reader.loadBloomfilter();
4232      assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4233      assertEquals(num_unique_rows, reader.getFilterEntries());
4234    }
4235
4236    region.compact(true);
4237
4238    // after compaction
4239    storeFiles = store.getStorefiles();
4240    for (HStoreFile storefile : storeFiles) {
4241      StoreFileReader reader = storefile.getReader();
4242      reader.loadFileInfo();
4243      reader.loadBloomfilter();
4244      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4245      assertEquals(num_unique_rows, reader.getFilterEntries());
4246    }
4247  }
4248
4249  @Test
4250  public void testAllColumnsWithBloomFilter() throws IOException {
4251    byte[] TABLE = Bytes.toBytes(name.getMethodName());
4252    byte[] FAMILY = Bytes.toBytes("family");
4253
4254    // Create table
4255    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
4256      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY)
4257        .setMaxVersions(Integer.MAX_VALUE)
4258        .setBloomFilterType(BloomType.ROWCOL);
4259    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
4260      new TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(TABLE));
4261    tableDescriptor.setColumnFamily(familyDescriptor);
4262    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
4263    this.region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
4264    // For row:0, col:0: insert versions 1 through 5.
4265    byte[] row = Bytes.toBytes("row:" + 0);
4266    byte[] column = Bytes.toBytes("column:" + 0);
4267    Put put = new Put(row);
4268    put.setDurability(Durability.SKIP_WAL);
4269    for (long idx = 1; idx <= 4; idx++) {
4270      put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4271    }
4272    region.put(put);
4273
4274    // Flush
4275    region.flush(true);
4276
4277    // Get rows
4278    Get get = new Get(row);
4279    get.readAllVersions();
4280    Cell[] kvs = region.get(get).rawCells();
4281
4282    // Check if rows are correct
4283    assertEquals(4, kvs.length);
4284    checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4285    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4286    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4287    checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4288  }
4289
4290  /**
4291   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4292   * issuing delete row on columns with bloom filter set to row+col
4293   * (BloomType.ROWCOL)
4294   */
4295  @Test
4296  public void testDeleteRowWithBloomFilter() throws IOException {
4297    byte[] familyName = Bytes.toBytes("familyName");
4298
4299    // Create Table
4300    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
4301      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(familyName)
4302        .setMaxVersions(Integer.MAX_VALUE)
4303        .setBloomFilterType(BloomType.ROWCOL);
4304
4305    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
4306      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
4307    tableDescriptor.setColumnFamily(familyDescriptor);
4308    RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
4309    this.region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
4310    // Insert some data
4311    byte[] row = Bytes.toBytes("row1");
4312    byte[] col = Bytes.toBytes("col1");
4313
4314    Put put = new Put(row);
4315    put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4316    region.put(put);
4317    region.flush(true);
4318
4319    Delete del = new Delete(row);
4320    region.delete(del);
4321    region.flush(true);
4322
4323    // Get remaining rows (should have none)
4324    Get get = new Get(row);
4325    get.addColumn(familyName, col);
4326
4327    Cell[] keyValues = region.get(get).rawCells();
4328    assertEquals(0, keyValues.length);
4329  }
4330
4331  @Test
4332  public void testgetHDFSBlocksDistribution() throws Exception {
4333    HBaseTestingUtility htu = new HBaseTestingUtility();
4334    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4335    // break up the file in to more pieces that can be distributed across the three nodes and we
4336    // won't be able to have the condition this test asserts; that at least one node has
4337    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4338    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4339    // final int DEFAULT_BLOCK_SIZE = 1024;
4340    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4341    htu.getConfiguration().setInt("dfs.replication", 2);
4342
4343    // set up a cluster with 3 nodes
4344    MiniHBaseCluster cluster = null;
4345    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4346    int regionServersCount = 3;
4347
4348    try {
4349      StartMiniClusterOption option = StartMiniClusterOption.builder()
4350          .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build();
4351      cluster = htu.startMiniCluster(option);
4352      byte[][] families = { fam1, fam2 };
4353      Table ht = htu.createTable(tableName, families);
4354
4355      // Setting up region
4356      byte row[] = Bytes.toBytes("row1");
4357      byte col[] = Bytes.toBytes("col1");
4358
4359      Put put = new Put(row);
4360      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4361      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4362      ht.put(put);
4363
4364      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4365      firstRegion.flush(true);
4366      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4367
4368      // Given the default replication factor is 2 and we have 2 HFiles,
4369      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4370      // must be at least one host that have replica for 2 HFiles. That host's
4371      // weight will be equal to the unique block weight.
4372      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4373      StringBuilder sb = new StringBuilder();
4374      for (String host: blocksDistribution1.getTopHosts()) {
4375        if (sb.length() > 0) sb.append(", ");
4376        sb.append(host);
4377        sb.append("=");
4378        sb.append(blocksDistribution1.getWeight(host));
4379      }
4380
4381      String topHost = blocksDistribution1.getTopHosts().get(0);
4382      long topHostWeight = blocksDistribution1.getWeight(topHost);
4383      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4384        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4385      LOG.info(msg);
4386      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4387
4388      // use the static method to compute the value, it should be the same.
4389      // static method is used by load balancer or other components
4390      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4391          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4392      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4393
4394      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4395
4396      ht.close();
4397    } finally {
4398      if (cluster != null) {
4399        htu.shutdownMiniCluster();
4400      }
4401    }
4402  }
4403
4404  /**
4405   * Testcase to check state of region initialization task set to ABORTED or not
4406   * if any exceptions during initialization
4407   *
4408   * @throws Exception
4409   */
4410  @Test
4411  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4412    RegionInfo info;
4413    try {
4414      FileSystem fs = Mockito.mock(FileSystem.class);
4415      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4416      TableDescriptorBuilder tableDescriptorBuilder =
4417        TableDescriptorBuilder.newBuilder(tableName);
4418      ColumnFamilyDescriptor columnFamilyDescriptor =
4419        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build();
4420      tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
4421      info = RegionInfoBuilder.newBuilder(tableName).build();
4422      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4423      region = HRegion.newHRegion(path, null, fs, CONF, info,
4424        tableDescriptorBuilder.build(), null);
4425      // region initialization throws IOException and set task state to ABORTED.
4426      region.initialize();
4427      fail("Region initialization should fail due to IOException");
4428    } catch (IOException io) {
4429      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4430      for (MonitoredTask monitoredTask : tasks) {
4431        if (!(monitoredTask instanceof MonitoredRPCHandler)
4432            && monitoredTask.getDescription().contains(region.toString())) {
4433          assertTrue("Region state should be ABORTED.",
4434              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4435          break;
4436        }
4437      }
4438    }
4439  }
4440
4441  /**
4442   * Verifies that the .regioninfo file is written on region creation and that
4443   * is recreated if missing during region opening.
4444   */
4445  @Test
4446  public void testRegionInfoFileCreation() throws IOException {
4447    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4448
4449    TableDescriptorBuilder tableDescriptorBuilder =
4450      TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()));
4451    ColumnFamilyDescriptor columnFamilyDescriptor =
4452      ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf")).build();
4453    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
4454    TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
4455
4456    RegionInfo hri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
4457
4458    // Create a region and skip the initialization (like CreateTableHandler)
4459    region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF,
4460      tableDescriptor, false);
4461    Path regionDir = region.getRegionFileSystem().getRegionDir();
4462    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4463    HBaseTestingUtility.closeRegionAndWAL(region);
4464
4465    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4466
4467    // Verify that the .regioninfo file is present
4468    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4469        fs.exists(regionInfoFile));
4470
4471    // Try to open the region
4472    region = HRegion.openHRegion(rootDir, hri, tableDescriptor, null, CONF);
4473    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4474    HBaseTestingUtility.closeRegionAndWAL(region);
4475
4476    // Verify that the .regioninfo file is still there
4477    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4478        fs.exists(regionInfoFile));
4479
4480    // Remove the .regioninfo file and verify is recreated on region open
4481    fs.delete(regionInfoFile, true);
4482    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4483        fs.exists(regionInfoFile));
4484
4485    region = HRegion.openHRegion(rootDir, hri, tableDescriptor, null, CONF);
4486//    region = TEST_UTIL.openHRegion(hri, htd);
4487    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4488    HBaseTestingUtility.closeRegionAndWAL(region);
4489
4490    // Verify that the .regioninfo file is still there
4491    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4492        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4493
4494    region = null;
4495  }
4496
4497  /**
4498   * TestCase for increment
4499   */
4500  private static class Incrementer implements Runnable {
4501    private HRegion region;
4502    private final static byte[] incRow = Bytes.toBytes("incRow");
4503    private final static byte[] family = Bytes.toBytes("family");
4504    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4505    private final static long ONE = 1L;
4506    private int incCounter;
4507
4508    public Incrementer(HRegion region, int incCounter) {
4509      this.region = region;
4510      this.incCounter = incCounter;
4511    }
4512
4513    @Override
4514    public void run() {
4515      int count = 0;
4516      while (count < incCounter) {
4517        Increment inc = new Increment(incRow);
4518        inc.addColumn(family, qualifier, ONE);
4519        count++;
4520        try {
4521          region.increment(inc);
4522        } catch (IOException e) {
4523          LOG.info("Count=" + count + ", " + e);
4524          break;
4525        }
4526      }
4527    }
4528  }
4529
4530  /**
4531   * Test case to check increment function with memstore flushing
4532   * @throws Exception
4533   */
4534  @Test
4535  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4536    byte[] family = Incrementer.family;
4537    this.region = initHRegion(tableName, method, CONF, family);
4538    final HRegion region = this.region;
4539    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4540    Runnable flusher = new Runnable() {
4541      @Override
4542      public void run() {
4543        while (!incrementDone.get()) {
4544          try {
4545            region.flush(true);
4546          } catch (Exception e) {
4547            e.printStackTrace();
4548          }
4549        }
4550      }
4551    };
4552
4553    // after all increment finished, the row will increment to 20*100 = 2000
4554    int threadNum = 20;
4555    int incCounter = 100;
4556    long expected = (long) threadNum * incCounter;
4557    Thread[] incrementers = new Thread[threadNum];
4558    Thread flushThread = new Thread(flusher);
4559    for (int i = 0; i < threadNum; i++) {
4560      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4561      incrementers[i].start();
4562    }
4563    flushThread.start();
4564    for (int i = 0; i < threadNum; i++) {
4565      incrementers[i].join();
4566    }
4567
4568    incrementDone.set(true);
4569    flushThread.join();
4570
4571    Get get = new Get(Incrementer.incRow);
4572    get.addColumn(Incrementer.family, Incrementer.qualifier);
4573    get.readVersions(1);
4574    Result res = this.region.get(get);
4575    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4576
4577    // we just got the latest version
4578    assertEquals(1, kvs.size());
4579    Cell kv = kvs.get(0);
4580    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4581  }
4582
4583  /**
4584   * TestCase for append
4585   */
4586  private static class Appender implements Runnable {
4587    private HRegion region;
4588    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4589    private final static byte[] family = Bytes.toBytes("family");
4590    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4591    private final static byte[] CHAR = Bytes.toBytes("a");
4592    private int appendCounter;
4593
4594    public Appender(HRegion region, int appendCounter) {
4595      this.region = region;
4596      this.appendCounter = appendCounter;
4597    }
4598
4599    @Override
4600    public void run() {
4601      int count = 0;
4602      while (count < appendCounter) {
4603        Append app = new Append(appendRow);
4604        app.addColumn(family, qualifier, CHAR);
4605        count++;
4606        try {
4607          region.append(app);
4608        } catch (IOException e) {
4609          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4610          break;
4611        }
4612      }
4613    }
4614  }
4615
4616  /**
4617   * Test case to check append function with memstore flushing
4618   * @throws Exception
4619   */
4620  @Test
4621  public void testParallelAppendWithMemStoreFlush() throws Exception {
4622    byte[] family = Appender.family;
4623    this.region = initHRegion(tableName, method, CONF, family);
4624    final HRegion region = this.region;
4625    final AtomicBoolean appendDone = new AtomicBoolean(false);
4626    Runnable flusher = new Runnable() {
4627      @Override
4628      public void run() {
4629        while (!appendDone.get()) {
4630          try {
4631            region.flush(true);
4632          } catch (Exception e) {
4633            e.printStackTrace();
4634          }
4635        }
4636      }
4637    };
4638
4639    // After all append finished, the value will append to threadNum *
4640    // appendCounter Appender.CHAR
4641    int threadNum = 20;
4642    int appendCounter = 100;
4643    byte[] expected = new byte[threadNum * appendCounter];
4644    for (int i = 0; i < threadNum * appendCounter; i++) {
4645      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4646    }
4647    Thread[] appenders = new Thread[threadNum];
4648    Thread flushThread = new Thread(flusher);
4649    for (int i = 0; i < threadNum; i++) {
4650      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4651      appenders[i].start();
4652    }
4653    flushThread.start();
4654    for (int i = 0; i < threadNum; i++) {
4655      appenders[i].join();
4656    }
4657
4658    appendDone.set(true);
4659    flushThread.join();
4660
4661    Get get = new Get(Appender.appendRow);
4662    get.addColumn(Appender.family, Appender.qualifier);
4663    get.readVersions(1);
4664    Result res = this.region.get(get);
4665    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4666
4667    // we just got the latest version
4668    assertEquals(1, kvs.size());
4669    Cell kv = kvs.get(0);
4670    byte[] appendResult = new byte[kv.getValueLength()];
4671    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4672    assertArrayEquals(expected, appendResult);
4673  }
4674
4675  /**
4676   * Test case to check put function with memstore flushing for same row, same ts
4677   * @throws Exception
4678   */
4679  @Test
4680  public void testPutWithMemStoreFlush() throws Exception {
4681    byte[] family = Bytes.toBytes("family");
4682    byte[] qualifier = Bytes.toBytes("qualifier");
4683    byte[] row = Bytes.toBytes("putRow");
4684    byte[] value = null;
4685    this.region = initHRegion(tableName, method, CONF, family);
4686    Put put = null;
4687    Get get = null;
4688    List<Cell> kvs = null;
4689    Result res = null;
4690
4691    put = new Put(row);
4692    value = Bytes.toBytes("value0");
4693    put.addColumn(family, qualifier, 1234567L, value);
4694    region.put(put);
4695    get = new Get(row);
4696    get.addColumn(family, qualifier);
4697    get.readAllVersions();
4698    res = this.region.get(get);
4699    kvs = res.getColumnCells(family, qualifier);
4700    assertEquals(1, kvs.size());
4701    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4702
4703    region.flush(true);
4704    get = new Get(row);
4705    get.addColumn(family, qualifier);
4706    get.readAllVersions();
4707    res = this.region.get(get);
4708    kvs = res.getColumnCells(family, qualifier);
4709    assertEquals(1, kvs.size());
4710    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4711
4712    put = new Put(row);
4713    value = Bytes.toBytes("value1");
4714    put.addColumn(family, qualifier, 1234567L, value);
4715    region.put(put);
4716    get = new Get(row);
4717    get.addColumn(family, qualifier);
4718    get.readAllVersions();
4719    res = this.region.get(get);
4720    kvs = res.getColumnCells(family, qualifier);
4721    assertEquals(1, kvs.size());
4722    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4723
4724    region.flush(true);
4725    get = new Get(row);
4726    get.addColumn(family, qualifier);
4727    get.readAllVersions();
4728    res = this.region.get(get);
4729    kvs = res.getColumnCells(family, qualifier);
4730    assertEquals(1, kvs.size());
4731    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4732  }
4733
4734  @Test
4735  public void testDurability() throws Exception {
4736    // there are 5 x 5 cases:
4737    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4738    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4739
4740    // expected cases for append and sync wal
4741    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4742    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4743    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4744
4745    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4746    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4747    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4748
4749    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4750    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4751
4752    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4753    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4754
4755    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4756    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4757    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4758
4759    // expected cases for async wal
4760    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4761    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4762    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4763    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4764    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4765    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4766
4767    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4768    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4769    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4770    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4771    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4772    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4773
4774    // expect skip wal cases
4775    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4776    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4777    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4778    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4779    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4780    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4781
4782  }
4783
4784  private void durabilityTest(String method, Durability tableDurability,
4785      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4786      final boolean expectSyncFromLogSyncer) throws Exception {
4787    Configuration conf = HBaseConfiguration.create(CONF);
4788    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4789    byte[] family = Bytes.toBytes("family");
4790    Path logDir = new Path(new Path(dir + method), "log");
4791    final Configuration walConf = new Configuration(conf);
4792    CommonFSUtils.setRootDir(walConf, logDir);
4793    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4794    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4795    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4796    final WALFactory wals = new WALFactory(walConf, HBaseTestingUtility.getRandomUUID().toString());
4797    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4798    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4799        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4800        new byte[][] { family });
4801
4802    Put put = new Put(Bytes.toBytes("r1"));
4803    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4804    put.setDurability(mutationDurability);
4805    region.put(put);
4806
4807    // verify append called or not
4808    verify(wal, expectAppend ? times(1) : never()).appendData((RegionInfo) any(),
4809      (WALKeyImpl) any(), (WALEdit) any());
4810
4811    // verify sync called or not
4812    if (expectSync || expectSyncFromLogSyncer) {
4813      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4814        @Override
4815        public boolean evaluate() throws Exception {
4816          try {
4817            if (expectSync) {
4818              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4819            } else if (expectSyncFromLogSyncer) {
4820              verify(wal, times(1)).sync(); // wal syncer calls this one
4821            }
4822          } catch (Throwable ignore) {
4823          }
4824          return true;
4825        }
4826      });
4827    } else {
4828      //verify(wal, never()).sync(anyLong());
4829      verify(wal, never()).sync();
4830    }
4831
4832    HBaseTestingUtility.closeRegionAndWAL(this.region);
4833    wals.close();
4834    this.region = null;
4835  }
4836
4837  @Test
4838  public void testRegionReplicaSecondary() throws IOException {
4839    // create a primary region, load some data and flush
4840    // create a secondary region, and do a get against that
4841    Path rootDir = new Path(dir + name.getMethodName());
4842    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4843
4844    byte[][] families = new byte[][] {
4845        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4846    };
4847    byte[] cq = Bytes.toBytes("cq");
4848    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
4849      new TableDescriptorBuilder.ModifyableTableDescriptor(
4850        TableName.valueOf(name.getMethodName()));
4851    for (byte[] family : families) {
4852      tableDescriptor.setColumnFamily(
4853        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family));
4854    }
4855
4856    long time = System.currentTimeMillis();
4857    RegionInfo primaryHri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
4858      .setRegionId(time).setReplicaId(0).build();
4859    RegionInfo secondaryHri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
4860      .setRegionId(time).setReplicaId(1).build();
4861
4862    HRegion primaryRegion = null, secondaryRegion = null;
4863
4864    try {
4865      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4866          rootDir, TEST_UTIL.getConfiguration(), tableDescriptor);
4867
4868      // load some data
4869      putData(primaryRegion, 0, 1000, cq, families);
4870
4871      // flush region
4872      primaryRegion.flush(true);
4873
4874      // open secondary region
4875      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, tableDescriptor, null, CONF);
4876
4877      verifyData(secondaryRegion, 0, 1000, cq, families);
4878    } finally {
4879      if (primaryRegion != null) {
4880        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4881      }
4882      if (secondaryRegion != null) {
4883        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4884      }
4885    }
4886  }
4887
4888  @Test
4889  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4890    // create a primary region, load some data and flush
4891    // create a secondary region, and do a put against that
4892    Path rootDir = new Path(dir + name.getMethodName());
4893    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4894
4895    byte[][] families = new byte[][] {
4896        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4897    };
4898    byte[] cq = Bytes.toBytes("cq");
4899    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
4900      new TableDescriptorBuilder.ModifyableTableDescriptor(
4901        TableName.valueOf(name.getMethodName()));
4902    for (byte[] family : families) {
4903      tableDescriptor.setColumnFamily(
4904        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family));
4905    }
4906
4907    long time = System.currentTimeMillis();
4908    RegionInfo primaryHri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
4909      .setRegionId(time).setReplicaId(0).build();
4910    RegionInfo secondaryHri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
4911      .setRegionId(time).setReplicaId(1).build();
4912
4913    HRegion primaryRegion = null, secondaryRegion = null;
4914
4915    try {
4916      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4917          rootDir, TEST_UTIL.getConfiguration(), tableDescriptor);
4918
4919      // load some data
4920      putData(primaryRegion, 0, 1000, cq, families);
4921
4922      // flush region
4923      primaryRegion.flush(true);
4924
4925      // open secondary region
4926      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, tableDescriptor, null, CONF);
4927
4928      try {
4929        putData(secondaryRegion, 0, 1000, cq, families);
4930        fail("Should have thrown exception");
4931      } catch (IOException ex) {
4932        // expected
4933      }
4934    } finally {
4935      if (primaryRegion != null) {
4936        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4937      }
4938      if (secondaryRegion != null) {
4939        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4940      }
4941    }
4942  }
4943
4944  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4945    Configuration confForWAL = new Configuration(conf);
4946    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4947    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4948  }
4949
4950  @Test
4951  public void testCompactionFromPrimary() throws IOException {
4952    Path rootDir = new Path(dir + name.getMethodName());
4953    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4954
4955    byte[][] families = new byte[][] {
4956        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4957    };
4958    byte[] cq = Bytes.toBytes("cq");
4959    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
4960      new TableDescriptorBuilder.ModifyableTableDescriptor(
4961        TableName.valueOf(name.getMethodName()));
4962    for (byte[] family : families) {
4963      tableDescriptor.setColumnFamily(
4964        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family));
4965    }
4966
4967    long time = System.currentTimeMillis();
4968    RegionInfo primaryHri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
4969      .setRegionId(time).setReplicaId(0).build();
4970    RegionInfo secondaryHri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName())
4971      .setRegionId(time).setReplicaId(1).build();
4972
4973    HRegion primaryRegion = null, secondaryRegion = null;
4974
4975    try {
4976      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4977          rootDir, TEST_UTIL.getConfiguration(), tableDescriptor);
4978
4979      // load some data
4980      putData(primaryRegion, 0, 1000, cq, families);
4981
4982      // flush region
4983      primaryRegion.flush(true);
4984
4985      // open secondary region
4986      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, tableDescriptor, null, CONF);
4987
4988      // move the file of the primary region to the archive, simulating a compaction
4989      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4990      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4991      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4992          .getStoreFiles(families[0]);
4993      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4994
4995      verifyData(secondaryRegion, 0, 1000, cq, families);
4996    } finally {
4997      if (primaryRegion != null) {
4998        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4999      }
5000      if (secondaryRegion != null) {
5001        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
5002      }
5003    }
5004  }
5005
5006  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
5007      IOException {
5008    putData(this.region, startRow, numRows, qf, families);
5009  }
5010
5011  private void putData(HRegion region,
5012      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5013    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
5014  }
5015
5016  static void putData(HRegion region, Durability durability,
5017      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5018    for (int i = startRow; i < startRow + numRows; i++) {
5019      Put put = new Put(Bytes.toBytes("" + i));
5020      put.setDurability(durability);
5021      for (byte[] family : families) {
5022        put.addColumn(family, qf, null);
5023      }
5024      region.put(put);
5025      LOG.info(put.toString());
5026    }
5027  }
5028
5029  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
5030      throws IOException {
5031    for (int i = startRow; i < startRow + numRows; i++) {
5032      byte[] row = Bytes.toBytes("" + i);
5033      Get get = new Get(row);
5034      for (byte[] family : families) {
5035        get.addColumn(family, qf);
5036      }
5037      Result result = newReg.get(get);
5038      Cell[] raw = result.rawCells();
5039      assertEquals(families.length, result.size());
5040      for (int j = 0; j < families.length; j++) {
5041        assertTrue(CellUtil.matchingRows(raw[j], row));
5042        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
5043        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
5044      }
5045    }
5046  }
5047
5048  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
5049    // Now I have k, get values out and assert they are as expected.
5050    Get get = new Get(k).addFamily(family).readAllVersions();
5051    Cell[] results = r.get(get).rawCells();
5052    for (int j = 0; j < results.length; j++) {
5053      byte[] tmp = CellUtil.cloneValue(results[j]);
5054      // Row should be equal to value every time.
5055      assertTrue(Bytes.equals(k, tmp));
5056    }
5057  }
5058
5059  /*
5060   * Assert first value in the passed region is <code>firstValue</code>.
5061   *
5062   * @param r
5063   *
5064   * @param fs
5065   *
5066   * @param firstValue
5067   *
5068   * @throws IOException
5069   */
5070  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5071      throws IOException {
5072    byte[][] families = { fs };
5073    Scan scan = new Scan();
5074    for (int i = 0; i < families.length; i++)
5075      scan.addFamily(families[i]);
5076    InternalScanner s = r.getScanner(scan);
5077    try {
5078      List<Cell> curVals = new ArrayList<>();
5079      boolean first = true;
5080      OUTER_LOOP: while (s.next(curVals)) {
5081        for (Cell kv : curVals) {
5082          byte[] val = CellUtil.cloneValue(kv);
5083          byte[] curval = val;
5084          if (first) {
5085            first = false;
5086            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5087          } else {
5088            // Not asserting anything. Might as well break.
5089            break OUTER_LOOP;
5090          }
5091        }
5092      }
5093    } finally {
5094      s.close();
5095    }
5096  }
5097
5098  /**
5099   * Test that we get the expected flush results back
5100   */
5101  @Test
5102  public void testFlushResult() throws IOException {
5103    byte[] family = Bytes.toBytes("family");
5104
5105    this.region = initHRegion(tableName, method, family);
5106
5107    // empty memstore, flush doesn't run
5108    HRegion.FlushResult fr = region.flush(true);
5109    assertFalse(fr.isFlushSucceeded());
5110    assertFalse(fr.isCompactionNeeded());
5111
5112    // Flush enough files to get up to the threshold, doesn't need compactions
5113    for (int i = 0; i < 2; i++) {
5114      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5115      region.put(put);
5116      fr = region.flush(true);
5117      assertTrue(fr.isFlushSucceeded());
5118      assertFalse(fr.isCompactionNeeded());
5119    }
5120
5121    // Two flushes after the threshold, compactions are needed
5122    for (int i = 0; i < 2; i++) {
5123      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
5124      region.put(put);
5125      fr = region.flush(true);
5126      assertTrue(fr.isFlushSucceeded());
5127      assertTrue(fr.isCompactionNeeded());
5128    }
5129  }
5130
5131  protected Configuration initSplit() {
5132    // Always compact if there is more than one store file.
5133    CONF.setInt("hbase.hstore.compactionThreshold", 2);
5134
5135    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5136
5137    // Increase the amount of time between client retries
5138    CONF.setLong("hbase.client.pause", 15 * 1000);
5139
5140    // This size should make it so we always split using the addContent
5141    // below. After adding all data, the first region is 1.3M
5142    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5143    return CONF;
5144  }
5145
5146  /**
5147   * @return A region on which you must call
5148   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5149   */
5150  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5151      byte[]... families) throws IOException {
5152    return initHRegion(tableName, callingMethod, conf, false, families);
5153  }
5154
5155  /**
5156   * @return A region on which you must call
5157   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5158   */
5159  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5160      boolean isReadOnly, byte[]... families) throws IOException {
5161    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5162  }
5163
5164  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5165    String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5166    throws IOException {
5167    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
5168    RegionInfo hri =
5169      RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
5170    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
5171    return initHRegion(tableName, startKey, stopKey, isReadOnly, Durability.SYNC_WAL, wal,
5172      families);
5173  }
5174
5175  /**
5176   * @return A region on which you must call
5177   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
5178   */
5179  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
5180      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
5181    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
5182    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
5183        isReadOnly, durability, wal, families);
5184  }
5185
5186  /**
5187   * Assert that the passed in Cell has expected contents for the specified row,
5188   * column & timestamp.
5189   */
5190  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5191    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5192    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5193        Bytes.toString(CellUtil.cloneRow(kv)));
5194    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5195        Bytes.toString(CellUtil.cloneFamily(kv)));
5196    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5197        Bytes.toString(CellUtil.cloneQualifier(kv)));
5198    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5199    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5200        Bytes.toString(CellUtil.cloneValue(kv)));
5201  }
5202
5203  @Test
5204  public void testReverseScanner_FromMemStore_SingleCF_Normal()
5205      throws IOException {
5206    byte[] rowC = Bytes.toBytes("rowC");
5207    byte[] rowA = Bytes.toBytes("rowA");
5208    byte[] rowB = Bytes.toBytes("rowB");
5209    byte[] cf = Bytes.toBytes("CF");
5210    byte[][] families = { cf };
5211    byte[] col = Bytes.toBytes("C");
5212    long ts = 1;
5213    this.region = initHRegion(tableName, method, families);
5214    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5215    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5216        null);
5217    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5218    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5219    Put put = null;
5220    put = new Put(rowC);
5221    put.add(kv1);
5222    put.add(kv11);
5223    region.put(put);
5224    put = new Put(rowA);
5225    put.add(kv2);
5226    region.put(put);
5227    put = new Put(rowB);
5228    put.add(kv3);
5229    region.put(put);
5230
5231    Scan scan = new Scan().withStartRow(rowC);
5232    scan.readVersions(5);
5233    scan.setReversed(true);
5234    InternalScanner scanner = region.getScanner(scan);
5235    List<Cell> currRow = new ArrayList<>();
5236    boolean hasNext = scanner.next(currRow);
5237    assertEquals(2, currRow.size());
5238    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5239        .get(0).getRowLength(), rowC, 0, rowC.length));
5240    assertTrue(hasNext);
5241    currRow.clear();
5242    hasNext = scanner.next(currRow);
5243    assertEquals(1, currRow.size());
5244    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5245        .get(0).getRowLength(), rowB, 0, rowB.length));
5246    assertTrue(hasNext);
5247    currRow.clear();
5248    hasNext = scanner.next(currRow);
5249    assertEquals(1, currRow.size());
5250    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5251        .get(0).getRowLength(), rowA, 0, rowA.length));
5252    assertFalse(hasNext);
5253    scanner.close();
5254  }
5255
5256  @Test
5257  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5258      throws IOException {
5259    byte[] rowC = Bytes.toBytes("rowC");
5260    byte[] rowA = Bytes.toBytes("rowA");
5261    byte[] rowB = Bytes.toBytes("rowB");
5262    byte[] rowD = Bytes.toBytes("rowD");
5263    byte[] cf = Bytes.toBytes("CF");
5264    byte[][] families = { cf };
5265    byte[] col = Bytes.toBytes("C");
5266    long ts = 1;
5267    this.region = initHRegion(tableName, method, families);
5268    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5269    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5270        null);
5271    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5272    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5273    Put put = null;
5274    put = new Put(rowC);
5275    put.add(kv1);
5276    put.add(kv11);
5277    region.put(put);
5278    put = new Put(rowA);
5279    put.add(kv2);
5280    region.put(put);
5281    put = new Put(rowB);
5282    put.add(kv3);
5283    region.put(put);
5284
5285    Scan scan = new Scan().withStartRow(rowD);
5286    List<Cell> currRow = new ArrayList<>();
5287    scan.setReversed(true);
5288    scan.readVersions(5);
5289    InternalScanner scanner = region.getScanner(scan);
5290    boolean hasNext = scanner.next(currRow);
5291    assertEquals(2, currRow.size());
5292    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5293        .get(0).getRowLength(), rowC, 0, rowC.length));
5294    assertTrue(hasNext);
5295    currRow.clear();
5296    hasNext = scanner.next(currRow);
5297    assertEquals(1, currRow.size());
5298    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5299        .get(0).getRowLength(), rowB, 0, rowB.length));
5300    assertTrue(hasNext);
5301    currRow.clear();
5302    hasNext = scanner.next(currRow);
5303    assertEquals(1, currRow.size());
5304    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5305        .get(0).getRowLength(), rowA, 0, rowA.length));
5306    assertFalse(hasNext);
5307    scanner.close();
5308  }
5309
5310  @Test
5311  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5312      throws IOException {
5313    byte[] rowC = Bytes.toBytes("rowC");
5314    byte[] rowA = Bytes.toBytes("rowA");
5315    byte[] rowB = Bytes.toBytes("rowB");
5316    byte[] cf = Bytes.toBytes("CF");
5317    byte[][] families = { cf };
5318    byte[] col = Bytes.toBytes("C");
5319    long ts = 1;
5320    this.region = initHRegion(tableName, method, families);
5321    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5322    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5323        null);
5324    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5325    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5326    Put put = null;
5327    put = new Put(rowC);
5328    put.add(kv1);
5329    put.add(kv11);
5330    region.put(put);
5331    put = new Put(rowA);
5332    put.add(kv2);
5333    region.put(put);
5334    put = new Put(rowB);
5335    put.add(kv3);
5336    region.put(put);
5337    Scan scan = new Scan();
5338    List<Cell> currRow = new ArrayList<>();
5339    scan.setReversed(true);
5340    InternalScanner scanner = region.getScanner(scan);
5341    boolean hasNext = scanner.next(currRow);
5342    assertEquals(1, currRow.size());
5343    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5344        .get(0).getRowLength(), rowC, 0, rowC.length));
5345    assertTrue(hasNext);
5346    currRow.clear();
5347    hasNext = scanner.next(currRow);
5348    assertEquals(1, currRow.size());
5349    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5350        .get(0).getRowLength(), rowB, 0, rowB.length));
5351    assertTrue(hasNext);
5352    currRow.clear();
5353    hasNext = scanner.next(currRow);
5354    assertEquals(1, currRow.size());
5355    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5356        .get(0).getRowLength(), rowA, 0, rowA.length));
5357    assertFalse(hasNext);
5358    scanner.close();
5359  }
5360
5361  @Test
5362  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5363    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5364    byte[] rowA = Bytes.toBytes("rowA");
5365    byte[] rowB = Bytes.toBytes("rowB");
5366    byte[] rowC = Bytes.toBytes("rowC");
5367    byte[] rowD = Bytes.toBytes("rowD");
5368    byte[] rowE = Bytes.toBytes("rowE");
5369    byte[] cf = Bytes.toBytes("CF");
5370    byte[][] families = { cf };
5371    byte[] col1 = Bytes.toBytes("col1");
5372    byte[] col2 = Bytes.toBytes("col2");
5373    long ts = 1;
5374    this.region = initHRegion(tableName, method, families);
5375    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5376    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5377    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5378    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5379    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5380    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5381    Put put = null;
5382    put = new Put(rowA);
5383    put.add(kv1);
5384    region.put(put);
5385    put = new Put(rowB);
5386    put.add(kv2);
5387    region.put(put);
5388    put = new Put(rowC);
5389    put.add(kv3);
5390    region.put(put);
5391    put = new Put(rowD);
5392    put.add(kv4_1);
5393    region.put(put);
5394    put = new Put(rowD);
5395    put.add(kv4_2);
5396    region.put(put);
5397    put = new Put(rowE);
5398    put.add(kv5);
5399    region.put(put);
5400    region.flush(true);
5401    Scan scan = new Scan().withStartRow(rowD).withStopRow(rowA);
5402    scan.addColumn(families[0], col1);
5403    scan.setReversed(true);
5404    List<Cell> currRow = new ArrayList<>();
5405    InternalScanner scanner = region.getScanner(scan);
5406    boolean hasNext = scanner.next(currRow);
5407    assertEquals(1, currRow.size());
5408    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5409        .get(0).getRowLength(), rowD, 0, rowD.length));
5410    assertTrue(hasNext);
5411    currRow.clear();
5412    hasNext = scanner.next(currRow);
5413    assertEquals(1, currRow.size());
5414    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5415        .get(0).getRowLength(), rowC, 0, rowC.length));
5416    assertTrue(hasNext);
5417    currRow.clear();
5418    hasNext = scanner.next(currRow);
5419    assertEquals(1, currRow.size());
5420    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5421        .get(0).getRowLength(), rowB, 0, rowB.length));
5422    assertFalse(hasNext);
5423    scanner.close();
5424
5425    scan = new Scan().withStartRow(rowD).withStopRow(rowA);
5426    scan.addColumn(families[0], col2);
5427    scan.setReversed(true);
5428    currRow.clear();
5429    scanner = region.getScanner(scan);
5430    hasNext = scanner.next(currRow);
5431    assertEquals(1, currRow.size());
5432    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5433        .get(0).getRowLength(), rowD, 0, rowD.length));
5434    scanner.close();
5435  }
5436
5437  @Test
5438  public void testReverseScanner_smaller_blocksize() throws IOException {
5439    // case to ensure no conflict with HFile index optimization
5440    byte[] rowA = Bytes.toBytes("rowA");
5441    byte[] rowB = Bytes.toBytes("rowB");
5442    byte[] rowC = Bytes.toBytes("rowC");
5443    byte[] rowD = Bytes.toBytes("rowD");
5444    byte[] rowE = Bytes.toBytes("rowE");
5445    byte[] cf = Bytes.toBytes("CF");
5446    byte[][] families = { cf };
5447    byte[] col1 = Bytes.toBytes("col1");
5448    byte[] col2 = Bytes.toBytes("col2");
5449    long ts = 1;
5450    HBaseConfiguration config = new HBaseConfiguration();
5451    config.setInt("test.block.size", 1);
5452    this.region = initHRegion(tableName, method, config, families);
5453    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5454    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5455    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5456    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5457    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5458    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5459    Put put = null;
5460    put = new Put(rowA);
5461    put.add(kv1);
5462    region.put(put);
5463    put = new Put(rowB);
5464    put.add(kv2);
5465    region.put(put);
5466    put = new Put(rowC);
5467    put.add(kv3);
5468    region.put(put);
5469    put = new Put(rowD);
5470    put.add(kv4_1);
5471    region.put(put);
5472    put = new Put(rowD);
5473    put.add(kv4_2);
5474    region.put(put);
5475    put = new Put(rowE);
5476    put.add(kv5);
5477    region.put(put);
5478    region.flush(true);
5479    Scan scan = new Scan().withStartRow(rowD).withStopRow(rowA);
5480    scan.addColumn(families[0], col1);
5481    scan.setReversed(true);
5482    List<Cell> currRow = new ArrayList<>();
5483    InternalScanner scanner = region.getScanner(scan);
5484    boolean hasNext = scanner.next(currRow);
5485    assertEquals(1, currRow.size());
5486    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5487        .get(0).getRowLength(), rowD, 0, rowD.length));
5488    assertTrue(hasNext);
5489    currRow.clear();
5490    hasNext = scanner.next(currRow);
5491    assertEquals(1, currRow.size());
5492    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5493        .get(0).getRowLength(), rowC, 0, rowC.length));
5494    assertTrue(hasNext);
5495    currRow.clear();
5496    hasNext = scanner.next(currRow);
5497    assertEquals(1, currRow.size());
5498    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5499        .get(0).getRowLength(), rowB, 0, rowB.length));
5500    assertFalse(hasNext);
5501    scanner.close();
5502
5503    scan = new Scan().withStartRow(rowD).withStopRow(rowA);
5504    scan.addColumn(families[0], col2);
5505    scan.setReversed(true);
5506    currRow.clear();
5507    scanner = region.getScanner(scan);
5508    hasNext = scanner.next(currRow);
5509    assertEquals(1, currRow.size());
5510    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5511        .get(0).getRowLength(), rowD, 0, rowD.length));
5512    scanner.close();
5513  }
5514
5515  @Test
5516  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5517      throws IOException {
5518    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5519    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5520    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5521    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5522    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5523    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5524    byte[] cf1 = Bytes.toBytes("CF1");
5525    byte[] cf2 = Bytes.toBytes("CF2");
5526    byte[] cf3 = Bytes.toBytes("CF3");
5527    byte[][] families = { cf1, cf2, cf3 };
5528    byte[] col = Bytes.toBytes("C");
5529    long ts = 1;
5530    HBaseConfiguration conf = new HBaseConfiguration();
5531    // disable compactions in this test.
5532    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5533    this.region = initHRegion(tableName, method, conf, families);
5534    // kv naming style: kv(row number) totalKvCountInThisRow seq no
5535    KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5536        null);
5537    KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5538        null);
5539    KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5540        KeyValue.Type.Put, null);
5541    KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5542        null);
5543    KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5544        null);
5545    KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5546        null);
5547    KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5548        KeyValue.Type.Put, null);
5549    KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5550        null);
5551    KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5552        KeyValue.Type.Put, null);
5553    KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5554        null);
5555    KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5556        null);
5557    KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5558        KeyValue.Type.Put, null);
5559    KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5560        null);
5561    KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5562        KeyValue.Type.Put, null);
5563    KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5564        null);
5565    KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5566        null);
5567    // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5568    Put put = null;
5569    put = new Put(row1);
5570    put.add(kv1_2_1);
5571    region.put(put);
5572    put = new Put(row2);
5573    put.add(kv2_4_1);
5574    region.put(put);
5575    put = new Put(row4);
5576    put.add(kv4_5_4);
5577    put.add(kv4_5_5);
5578    region.put(put);
5579    region.flush(true);
5580    // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5581    put = new Put(row4);
5582    put.add(kv4_5_1);
5583    put.add(kv4_5_3);
5584    region.put(put);
5585    put = new Put(row1);
5586    put.add(kv1_2_2);
5587    region.put(put);
5588    put = new Put(row2);
5589    put.add(kv2_4_4);
5590    region.put(put);
5591    region.flush(true);
5592    // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5593    put = new Put(row4);
5594    put.add(kv4_5_2);
5595    region.put(put);
5596    put = new Put(row2);
5597    put.add(kv2_4_2);
5598    put.add(kv2_4_3);
5599    region.put(put);
5600    put = new Put(row3);
5601    put.add(kv3_2_2);
5602    region.put(put);
5603    region.flush(true);
5604    // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5605    // ( 2 kv)
5606    put = new Put(row0);
5607    put.add(kv0_1_1);
5608    region.put(put);
5609    put = new Put(row3);
5610    put.add(kv3_2_1);
5611    region.put(put);
5612    put = new Put(row5);
5613    put.add(kv5_2_1);
5614    put.add(kv5_2_2);
5615    region.put(put);
5616    // scan range = ["row4", min), skip the max "row5"
5617    Scan scan = new Scan().withStartRow(row4);
5618    scan.readVersions(5);
5619    scan.setBatch(3);
5620    scan.setReversed(true);
5621    InternalScanner scanner = region.getScanner(scan);
5622    List<Cell> currRow = new ArrayList<>();
5623    boolean hasNext = false;
5624    // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5625    // included in scan range
5626    // "row4" takes 2 next() calls since batch=3
5627    hasNext = scanner.next(currRow);
5628    assertEquals(3, currRow.size());
5629    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5630        .get(0).getRowLength(), row4, 0, row4.length));
5631    assertTrue(hasNext);
5632    currRow.clear();
5633    hasNext = scanner.next(currRow);
5634    assertEquals(2, currRow.size());
5635    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5636        currRow.get(0).getRowLength(), row4, 0,
5637      row4.length));
5638    assertTrue(hasNext);
5639    // 2. scan out "row3" (2 kv)
5640    currRow.clear();
5641    hasNext = scanner.next(currRow);
5642    assertEquals(2, currRow.size());
5643    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5644        .get(0).getRowLength(), row3, 0, row3.length));
5645    assertTrue(hasNext);
5646    // 3. scan out "row2" (4 kvs)
5647    // "row2" takes 2 next() calls since batch=3
5648    currRow.clear();
5649    hasNext = scanner.next(currRow);
5650    assertEquals(3, currRow.size());
5651    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5652        .get(0).getRowLength(), row2, 0, row2.length));
5653    assertTrue(hasNext);
5654    currRow.clear();
5655    hasNext = scanner.next(currRow);
5656    assertEquals(1, currRow.size());
5657    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5658        .get(0).getRowLength(), row2, 0, row2.length));
5659    assertTrue(hasNext);
5660    // 4. scan out "row1" (2 kv)
5661    currRow.clear();
5662    hasNext = scanner.next(currRow);
5663    assertEquals(2, currRow.size());
5664    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5665        .get(0).getRowLength(), row1, 0, row1.length));
5666    assertTrue(hasNext);
5667    // 5. scan out "row0" (1 kv)
5668    currRow.clear();
5669    hasNext = scanner.next(currRow);
5670    assertEquals(1, currRow.size());
5671    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5672        .get(0).getRowLength(), row0, 0, row0.length));
5673    assertFalse(hasNext);
5674
5675    scanner.close();
5676  }
5677
5678  @Test
5679  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5680      throws IOException {
5681    byte[] row1 = Bytes.toBytes("row1");
5682    byte[] row2 = Bytes.toBytes("row2");
5683    byte[] row3 = Bytes.toBytes("row3");
5684    byte[] row4 = Bytes.toBytes("row4");
5685    byte[] cf1 = Bytes.toBytes("CF1");
5686    byte[] cf2 = Bytes.toBytes("CF2");
5687    byte[] cf3 = Bytes.toBytes("CF3");
5688    byte[] cf4 = Bytes.toBytes("CF4");
5689    byte[][] families = { cf1, cf2, cf3, cf4 };
5690    byte[] col = Bytes.toBytes("C");
5691    long ts = 1;
5692    HBaseConfiguration conf = new HBaseConfiguration();
5693    // disable compactions in this test.
5694    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5695    this.region = initHRegion(tableName, method, conf, families);
5696    KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5697    KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5698    KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5699    KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5700    // storefile1
5701    Put put = new Put(row1);
5702    put.add(kv1);
5703    region.put(put);
5704    region.flush(true);
5705    // storefile2
5706    put = new Put(row2);
5707    put.add(kv2);
5708    region.put(put);
5709    region.flush(true);
5710    // storefile3
5711    put = new Put(row3);
5712    put.add(kv3);
5713    region.put(put);
5714    region.flush(true);
5715    // memstore
5716    put = new Put(row4);
5717    put.add(kv4);
5718    region.put(put);
5719    // scan range = ["row4", min)
5720    Scan scan = new Scan().withStartRow(row4);
5721    scan.setReversed(true);
5722    scan.setBatch(10);
5723    InternalScanner scanner = region.getScanner(scan);
5724    List<Cell> currRow = new ArrayList<>();
5725    boolean hasNext = scanner.next(currRow);
5726    assertEquals(1, currRow.size());
5727    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5728        .get(0).getRowLength(), row4, 0, row4.length));
5729    assertTrue(hasNext);
5730    currRow.clear();
5731    hasNext = scanner.next(currRow);
5732    assertEquals(1, currRow.size());
5733    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5734        .get(0).getRowLength(), row3, 0, row3.length));
5735    assertTrue(hasNext);
5736    currRow.clear();
5737    hasNext = scanner.next(currRow);
5738    assertEquals(1, currRow.size());
5739    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5740        .get(0).getRowLength(), row2, 0, row2.length));
5741    assertTrue(hasNext);
5742    currRow.clear();
5743    hasNext = scanner.next(currRow);
5744    assertEquals(1, currRow.size());
5745    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5746        .get(0).getRowLength(), row1, 0, row1.length));
5747    assertFalse(hasNext);
5748  }
5749
5750  /**
5751   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5752   */
5753  @Test
5754  public void testReverseScanner_StackOverflow() throws IOException {
5755    byte[] cf1 = Bytes.toBytes("CF1");
5756    byte[][] families = {cf1};
5757    byte[] col = Bytes.toBytes("C");
5758    HBaseConfiguration conf = new HBaseConfiguration();
5759    this.region = initHRegion(tableName, method, conf, families);
5760    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5761    Put put = new Put(Bytes.toBytes("19998"));
5762    put.addColumn(cf1, col, Bytes.toBytes("val"));
5763    region.put(put);
5764    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5765    Put put2 = new Put(Bytes.toBytes("19997"));
5766    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5767    region.put(put2);
5768
5769    Scan scan = new Scan().withStartRow(Bytes.toBytes("19998"));
5770    scan.setReversed(true);
5771    InternalScanner scanner = region.getScanner(scan);
5772
5773    // create one storefile contains many rows will be skipped
5774    // to check StoreFileScanner.seekToPreviousRow
5775    for (int i = 10000; i < 20000; i++) {
5776      Put p = new Put(Bytes.toBytes(""+i));
5777      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5778      region.put(p);
5779    }
5780    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5781
5782    // create one memstore contains many rows will be skipped
5783    // to check MemStoreScanner.seekToPreviousRow
5784    for (int i = 10000; i < 20000; i++) {
5785      Put p = new Put(Bytes.toBytes(""+i));
5786      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5787      region.put(p);
5788    }
5789
5790    List<Cell> currRow = new ArrayList<>();
5791    boolean hasNext;
5792    do {
5793      hasNext = scanner.next(currRow);
5794    } while (hasNext);
5795    assertEquals(2, currRow.size());
5796    assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5797      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5798    assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5799      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5800  }
5801
5802  @Test
5803  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5804    byte[] cf1 = Bytes.toBytes("CF1");
5805    byte[][] families = { cf1 };
5806    byte[] col = Bytes.toBytes("C");
5807    HBaseConfiguration conf = new HBaseConfiguration();
5808    this.region = initHRegion(tableName, method, conf, families);
5809    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5810    Put put = new Put(Bytes.toBytes("19996"));
5811    put.addColumn(cf1, col, Bytes.toBytes("val"));
5812    region.put(put);
5813    Put put2 = new Put(Bytes.toBytes("19995"));
5814    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5815    region.put(put2);
5816    // create a reverse scan
5817    Scan scan = new Scan().withStartRow(Bytes.toBytes("19996"));
5818    scan.setReversed(true);
5819    RegionScannerImpl scanner = region.getScanner(scan);
5820
5821    // flush the cache. This will reset the store scanner
5822    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5823
5824    // create one memstore contains many rows will be skipped
5825    // to check MemStoreScanner.seekToPreviousRow
5826    for (int i = 10000; i < 20000; i++) {
5827      Put p = new Put(Bytes.toBytes("" + i));
5828      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5829      region.put(p);
5830    }
5831    List<Cell> currRow = new ArrayList<>();
5832    boolean hasNext;
5833    boolean assertDone = false;
5834    do {
5835      hasNext = scanner.next(currRow);
5836      // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5837      // added here
5838      if (!assertDone) {
5839        StoreScanner current =
5840            (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5841        List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5842        assertEquals("There should be only one scanner the store file scanner", 1,
5843          scanners.size());
5844        assertDone = true;
5845      }
5846    } while (hasNext);
5847    assertEquals(2, currRow.size());
5848    assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5849      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5850    assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5851      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5852  }
5853
5854  @Test
5855  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5856    byte[] cf1 = Bytes.toBytes("CF1");
5857    byte[][] families = { cf1 };
5858    byte[] col = Bytes.toBytes("C");
5859
5860    HBaseConfiguration conf = new HBaseConfiguration();
5861    this.region = initHRegion(tableName, method, conf, families);
5862
5863    Put put = new Put(Bytes.toBytes("199996"));
5864    put.addColumn(cf1, col, Bytes.toBytes("val"));
5865    region.put(put);
5866    Put put2 = new Put(Bytes.toBytes("199995"));
5867    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5868    region.put(put2);
5869
5870    // Create a reverse scan
5871    Scan scan = new Scan().withStartRow(Bytes.toBytes("199996"));
5872    scan.setReversed(true);
5873    RegionScannerImpl scanner = region.getScanner(scan);
5874
5875    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5876    for (int i = 100000; i < 200000; i++) {
5877      Put p = new Put(Bytes.toBytes("" + i));
5878      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5879      region.put(p);
5880    }
5881    List<Cell> currRow = new ArrayList<>();
5882    boolean hasNext;
5883    do {
5884      hasNext = scanner.next(currRow);
5885    } while (hasNext);
5886
5887    assertEquals(2, currRow.size());
5888    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5889      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5890    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5891      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5892  }
5893
5894  @Test
5895  public void testWriteRequestsCounter() throws IOException {
5896    byte[] fam = Bytes.toBytes("info");
5897    byte[][] families = { fam };
5898    this.region = initHRegion(tableName, method, CONF, families);
5899
5900    Assert.assertEquals(0L, region.getWriteRequestsCount());
5901
5902    Put put = new Put(row);
5903    put.addColumn(fam, fam, fam);
5904
5905    Assert.assertEquals(0L, region.getWriteRequestsCount());
5906    region.put(put);
5907    Assert.assertEquals(1L, region.getWriteRequestsCount());
5908    region.put(put);
5909    Assert.assertEquals(2L, region.getWriteRequestsCount());
5910    region.put(put);
5911    Assert.assertEquals(3L, region.getWriteRequestsCount());
5912
5913    region.delete(new Delete(row));
5914    Assert.assertEquals(4L, region.getWriteRequestsCount());
5915  }
5916
5917  @Test
5918  public void testOpenRegionWrittenToWAL() throws Exception {
5919    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5920    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5921
5922    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
5923      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
5924      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
5925    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
5926
5927    // open the region w/o rss and wal and flush some files
5928    region =
5929         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5930             .getConfiguration(), htd);
5931    assertNotNull(region);
5932
5933    // create a file in fam1 for the region before opening in OpenRegionHandler
5934    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5935    region.flush(true);
5936    HBaseTestingUtility.closeRegionAndWAL(region);
5937
5938    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5939
5940    // capture append() calls
5941    WAL wal = mockWAL();
5942    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
5943
5944    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5945      TEST_UTIL.getConfiguration(), rss, null);
5946
5947    verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
5948      editCaptor.capture());
5949
5950    WALEdit edit = editCaptor.getValue();
5951    assertNotNull(edit);
5952    assertNotNull(edit.getCells());
5953    assertEquals(1, edit.getCells().size());
5954    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5955    assertNotNull(desc);
5956
5957    LOG.info("RegionEventDescriptor from WAL: " + desc);
5958
5959    assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5960    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5961    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5962      hri.getEncodedNameAsBytes()));
5963    assertTrue(desc.getLogSequenceNumber() > 0);
5964    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5965    assertEquals(2, desc.getStoresCount());
5966
5967    StoreDescriptor store = desc.getStores(0);
5968    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5969    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5970    assertEquals(1, store.getStoreFileCount()); // 1store file
5971    assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5972
5973    store = desc.getStores(1);
5974    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5975    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5976    assertEquals(0, store.getStoreFileCount()); // no store files
5977  }
5978
5979  // Helper for test testOpenRegionWrittenToWALForLogReplay
5980  static class HRegionWithSeqId extends HRegion {
5981    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5982        final Configuration confParam, final RegionInfo regionInfo,
5983        final TableDescriptor htd, final RegionServerServices rsServices) {
5984      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5985    }
5986    @Override
5987    protected long getNextSequenceId(WAL wal) throws IOException {
5988      return 42;
5989    }
5990  }
5991
5992  @Test
5993  public void testFlushedFileWithNoTags() throws Exception {
5994    final TableName tableName = TableName.valueOf(name.getMethodName());
5995    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
5996      new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
5997    tableDescriptor.setColumnFamily(
5998      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam1));
5999    RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
6000    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
6001    region = HBaseTestingUtility.createRegionAndWAL(info, path,
6002      TEST_UTIL.getConfiguration(), tableDescriptor);
6003    Put put = new Put(Bytes.toBytes("a-b-0-0"));
6004    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
6005    region.put(put);
6006    region.flush(true);
6007    HStore store = region.getStore(fam1);
6008    Collection<HStoreFile> storefiles = store.getStorefiles();
6009    for (HStoreFile sf : storefiles) {
6010      assertFalse("Tags should not be present "
6011          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
6012    }
6013  }
6014
6015  /**
6016   * Utility method to setup a WAL mock.
6017   * <p/>
6018   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
6019   * @return a mock WAL
6020   */
6021  private WAL mockWAL() throws IOException {
6022    WAL wal = mock(WAL.class);
6023    when(wal.appendData(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class)))
6024      .thenAnswer(new Answer<Long>() {
6025        @Override
6026        public Long answer(InvocationOnMock invocation) throws Throwable {
6027          WALKeyImpl key = invocation.getArgument(1);
6028          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6029          key.setWriteEntry(we);
6030          return 1L;
6031        }
6032      });
6033    when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))).
6034        thenAnswer(new Answer<Long>() {
6035          @Override
6036          public Long answer(InvocationOnMock invocation) throws Throwable {
6037            WALKeyImpl key = invocation.getArgument(1);
6038            MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
6039            key.setWriteEntry(we);
6040            return 1L;
6041          }
6042        });
6043    return wal;
6044  }
6045
6046  @Test
6047  public void testCloseRegionWrittenToWAL() throws Exception {
6048    Path rootDir = new Path(dir + name.getMethodName());
6049    CommonFSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
6050
6051    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6052    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6053
6054    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
6055      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam1))
6056      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam2)).build();
6057    RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
6058
6059    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6060
6061    // capture append() calls
6062    WAL wal = mockWAL();
6063    when(rss.getWAL(any(RegionInfo.class))).thenReturn(wal);
6064
6065
6066    // create and then open a region first so that it can be closed later
6067    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
6068    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6069      TEST_UTIL.getConfiguration(), rss, null);
6070
6071    // close the region
6072    region.close(false);
6073
6074    // 2 times, one for region open, the other close region
6075    verify(wal, times(2)).appendMarker(any(RegionInfo.class),
6076        (WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture());
6077
6078    WALEdit edit = editCaptor.getAllValues().get(1);
6079    assertNotNull(edit);
6080    assertNotNull(edit.getCells());
6081    assertEquals(1, edit.getCells().size());
6082    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6083    assertNotNull(desc);
6084
6085    LOG.info("RegionEventDescriptor from WAL: " + desc);
6086
6087    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6088    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
6089    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6090      hri.getEncodedNameAsBytes()));
6091    assertTrue(desc.getLogSequenceNumber() > 0);
6092    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6093    assertEquals(2, desc.getStoresCount());
6094
6095    StoreDescriptor store = desc.getStores(0);
6096    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6097    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6098    assertEquals(0, store.getStoreFileCount()); // no store files
6099
6100    store = desc.getStores(1);
6101    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6102    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6103    assertEquals(0, store.getStoreFileCount()); // no store files
6104  }
6105
6106  /**
6107   * Test RegionTooBusyException thrown when region is busy
6108   */
6109  @Test
6110  public void testRegionTooBusy() throws IOException {
6111    byte[] family = Bytes.toBytes("family");
6112    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6113      HRegion.DEFAULT_BUSY_WAIT_DURATION);
6114    CONF.setLong("hbase.busy.wait.duration", 1000);
6115    region = initHRegion(tableName, method, CONF, family);
6116    final AtomicBoolean stopped = new AtomicBoolean(true);
6117    Thread t = new Thread(new Runnable() {
6118      @Override
6119      public void run() {
6120        try {
6121          region.lock.writeLock().lock();
6122          stopped.set(false);
6123          while (!stopped.get()) {
6124            Thread.sleep(100);
6125          }
6126        } catch (InterruptedException ie) {
6127        } finally {
6128          region.lock.writeLock().unlock();
6129        }
6130      }
6131    });
6132    t.start();
6133    Get get = new Get(row);
6134    try {
6135      while (stopped.get()) {
6136        Thread.sleep(100);
6137      }
6138      region.get(get);
6139      fail("Should throw RegionTooBusyException");
6140    } catch (InterruptedException ie) {
6141      fail("test interrupted");
6142    } catch (RegionTooBusyException e) {
6143      // Good, expected
6144    } finally {
6145      stopped.set(true);
6146      try {
6147        t.join();
6148      } catch (Throwable e) {
6149      }
6150
6151      HBaseTestingUtility.closeRegionAndWAL(region);
6152      region = null;
6153      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6154    }
6155  }
6156
6157  @Test
6158  public void testCellTTLs() throws IOException {
6159    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6160    EnvironmentEdgeManager.injectEdge(edge);
6161
6162    final byte[] row = Bytes.toBytes("testRow");
6163    final byte[] q1 = Bytes.toBytes("q1");
6164    final byte[] q2 = Bytes.toBytes("q2");
6165    final byte[] q3 = Bytes.toBytes("q3");
6166    final byte[] q4 = Bytes.toBytes("q4");
6167
6168    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
6169      new TableDescriptorBuilder.ModifyableTableDescriptor(
6170        TableName.valueOf(name.getMethodName()));
6171    ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
6172      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam1);
6173    familyDescriptor.setTimeToLive(10); // 10 seconds
6174    tableDescriptor.setColumnFamily(familyDescriptor);
6175
6176    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6177    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6178
6179    region = HBaseTestingUtility.createRegionAndWAL(
6180      RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
6181      TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
6182    assertNotNull(region);
6183    long now = EnvironmentEdgeManager.currentTime();
6184    // Add a cell that will expire in 5 seconds via cell TTL
6185    region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6186      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6187        // TTL tags specify ts in milliseconds
6188        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
6189    // Add a cell that will expire after 10 seconds via family setting
6190    region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6191    // Add a cell that will expire in 15 seconds via cell TTL
6192    region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6193      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
6194        // TTL tags specify ts in milliseconds
6195        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
6196    // Add a cell that will expire in 20 seconds via family setting
6197    region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6198
6199    // Flush so we are sure store scanning gets this right
6200    region.flush(true);
6201
6202    // A query at time T+0 should return all cells
6203    Result r = region.get(new Get(row));
6204    assertNotNull(r.getValue(fam1, q1));
6205    assertNotNull(r.getValue(fam1, q2));
6206    assertNotNull(r.getValue(fam1, q3));
6207    assertNotNull(r.getValue(fam1, q4));
6208
6209    // Increment time to T+5 seconds
6210    edge.incrementTime(5000);
6211
6212    r = region.get(new Get(row));
6213    assertNull(r.getValue(fam1, q1));
6214    assertNotNull(r.getValue(fam1, q2));
6215    assertNotNull(r.getValue(fam1, q3));
6216    assertNotNull(r.getValue(fam1, q4));
6217
6218    // Increment time to T+10 seconds
6219    edge.incrementTime(5000);
6220
6221    r = region.get(new Get(row));
6222    assertNull(r.getValue(fam1, q1));
6223    assertNull(r.getValue(fam1, q2));
6224    assertNotNull(r.getValue(fam1, q3));
6225    assertNotNull(r.getValue(fam1, q4));
6226
6227    // Increment time to T+15 seconds
6228    edge.incrementTime(5000);
6229
6230    r = region.get(new Get(row));
6231    assertNull(r.getValue(fam1, q1));
6232    assertNull(r.getValue(fam1, q2));
6233    assertNull(r.getValue(fam1, q3));
6234    assertNotNull(r.getValue(fam1, q4));
6235
6236    // Increment time to T+20 seconds
6237    edge.incrementTime(10000);
6238
6239    r = region.get(new Get(row));
6240    assertNull(r.getValue(fam1, q1));
6241    assertNull(r.getValue(fam1, q2));
6242    assertNull(r.getValue(fam1, q3));
6243    assertNull(r.getValue(fam1, q4));
6244
6245    // Fun with disappearing increments
6246
6247    // Start at 1
6248    region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
6249    r = region.get(new Get(row));
6250    byte[] val = r.getValue(fam1, q1);
6251    assertNotNull(val);
6252    assertEquals(1L, Bytes.toLong(val));
6253
6254    // Increment with a TTL of 5 seconds
6255    Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6256    incr.setTTL(5000);
6257    region.increment(incr); // 2
6258
6259    // New value should be 2
6260    r = region.get(new Get(row));
6261    val = r.getValue(fam1, q1);
6262    assertNotNull(val);
6263    assertEquals(2L, Bytes.toLong(val));
6264
6265    // Increment time to T+25 seconds
6266    edge.incrementTime(5000);
6267
6268    // Value should be back to 1
6269    r = region.get(new Get(row));
6270    val = r.getValue(fam1, q1);
6271    assertNotNull(val);
6272    assertEquals(1L, Bytes.toLong(val));
6273
6274    // Increment time to T+30 seconds
6275    edge.incrementTime(5000);
6276
6277    // Original value written at T+20 should be gone now via family TTL
6278    r = region.get(new Get(row));
6279    assertNull(r.getValue(fam1, q1));
6280  }
6281
6282  @Test
6283  public void testIncrementTimestampsAreMonotonic() throws IOException {
6284    region = initHRegion(tableName, method, CONF, fam1);
6285    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6286    EnvironmentEdgeManager.injectEdge(edge);
6287
6288    edge.setValue(10);
6289    Increment inc = new Increment(row);
6290    inc.setDurability(Durability.SKIP_WAL);
6291    inc.addColumn(fam1, qual1, 1L);
6292    region.increment(inc);
6293
6294    Result result = region.get(new Get(row));
6295    Cell c = result.getColumnLatestCell(fam1, qual1);
6296    assertNotNull(c);
6297    assertEquals(10L, c.getTimestamp());
6298
6299    edge.setValue(1); // clock goes back
6300    region.increment(inc);
6301    result = region.get(new Get(row));
6302    c = result.getColumnLatestCell(fam1, qual1);
6303    assertEquals(11L, c.getTimestamp());
6304    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6305  }
6306
6307  @Test
6308  public void testAppendTimestampsAreMonotonic() throws IOException {
6309    region = initHRegion(tableName, method, CONF, fam1);
6310    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6311    EnvironmentEdgeManager.injectEdge(edge);
6312
6313    edge.setValue(10);
6314    Append a = new Append(row);
6315    a.setDurability(Durability.SKIP_WAL);
6316    a.addColumn(fam1, qual1, qual1);
6317    region.append(a);
6318
6319    Result result = region.get(new Get(row));
6320    Cell c = result.getColumnLatestCell(fam1, qual1);
6321    assertNotNull(c);
6322    assertEquals(10L, c.getTimestamp());
6323
6324    edge.setValue(1); // clock goes back
6325    region.append(a);
6326    result = region.get(new Get(row));
6327    c = result.getColumnLatestCell(fam1, qual1);
6328    assertEquals(11L, c.getTimestamp());
6329
6330    byte[] expected = new byte[qual1.length*2];
6331    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6332    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6333
6334    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6335      expected, 0, expected.length));
6336  }
6337
6338  @Test
6339  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6340    region = initHRegion(tableName, method, CONF, fam1);
6341    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6342    EnvironmentEdgeManager.injectEdge(edge);
6343
6344    edge.setValue(10);
6345    Put p = new Put(row);
6346    p.setDurability(Durability.SKIP_WAL);
6347    p.addColumn(fam1, qual1, qual1);
6348    region.put(p);
6349
6350    Result result = region.get(new Get(row));
6351    Cell c = result.getColumnLatestCell(fam1, qual1);
6352    assertNotNull(c);
6353    assertEquals(10L, c.getTimestamp());
6354
6355    edge.setValue(1); // clock goes back
6356    p = new Put(row);
6357    p.setDurability(Durability.SKIP_WAL);
6358    p.addColumn(fam1, qual1, qual2);
6359    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6360    result = region.get(new Get(row));
6361    c = result.getColumnLatestCell(fam1, qual1);
6362    assertEquals(10L, c.getTimestamp());
6363
6364    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6365      qual2, 0, qual2.length));
6366  }
6367
6368  @Test
6369  public void testBatchMutateWithWrongRegionException() throws Exception {
6370    final byte[] a = Bytes.toBytes("a");
6371    final byte[] b = Bytes.toBytes("b");
6372    final byte[] c = Bytes.toBytes("c"); // exclusive
6373
6374    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6375    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6376    region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6377
6378    Mutation[] mutations = new Mutation[] {
6379        new Put(a)
6380            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6381              .setRow(a)
6382              .setFamily(fam1)
6383              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6384              .setType(Cell.Type.Put)
6385              .build()),
6386        // this is outside the region boundary
6387        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6388              .setRow(c)
6389              .setFamily(fam1)
6390              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6391              .setType(Type.Put)
6392              .build()),
6393        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6394              .setRow(b)
6395              .setFamily(fam1)
6396              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6397              .setType(Cell.Type.Put)
6398              .build())
6399    };
6400
6401    OperationStatus[] status = region.batchMutate(mutations);
6402    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6403    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6404    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6405
6406
6407    // test with a row lock held for a long time
6408    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6409    ExecutorService exec = Executors.newFixedThreadPool(2);
6410    Future<Void> f1 = exec.submit(new Callable<Void>() {
6411      @Override
6412      public Void call() throws Exception {
6413        LOG.info("Acquiring row lock");
6414        RowLock rl = region.getRowLock(b);
6415        obtainedRowLock.countDown();
6416        LOG.info("Waiting for 5 seconds before releasing lock");
6417        Threads.sleep(5000);
6418        LOG.info("Releasing row lock");
6419        rl.release();
6420        return null;
6421      }
6422    });
6423    obtainedRowLock.await(30, TimeUnit.SECONDS);
6424
6425    Future<Void> f2 = exec.submit(new Callable<Void>() {
6426      @Override
6427      public Void call() throws Exception {
6428        Mutation[] mutations = new Mutation[] {
6429            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6430                .setRow(a)
6431                .setFamily(fam1)
6432                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6433                .setType(Cell.Type.Put)
6434                .build()),
6435            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6436                .setRow(b)
6437                .setFamily(fam1)
6438                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6439                .setType(Cell.Type.Put)
6440                .build()),
6441        };
6442
6443        // this will wait for the row lock, and it will eventually succeed
6444        OperationStatus[] status = region.batchMutate(mutations);
6445        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6446        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6447        return null;
6448      }
6449    });
6450
6451    f1.get();
6452    f2.get();
6453
6454    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6455  }
6456
6457  @Test
6458  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6459    region = initHRegion(tableName, method, CONF, fam1);
6460    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6461    EnvironmentEdgeManager.injectEdge(edge);
6462
6463    edge.setValue(10);
6464    Put p = new Put(row);
6465    p.setDurability(Durability.SKIP_WAL);
6466    p.addColumn(fam1, qual1, qual1);
6467    region.put(p);
6468
6469    Result result = region.get(new Get(row));
6470    Cell c = result.getColumnLatestCell(fam1, qual1);
6471    assertNotNull(c);
6472    assertEquals(10L, c.getTimestamp());
6473
6474    edge.setValue(1); // clock goes back
6475    p = new Put(row);
6476    p.setDurability(Durability.SKIP_WAL);
6477    p.addColumn(fam1, qual1, qual2);
6478    RowMutations rm = new RowMutations(row);
6479    rm.add(p);
6480    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6481        new BinaryComparator(qual1), rm));
6482    result = region.get(new Get(row));
6483    c = result.getColumnLatestCell(fam1, qual1);
6484    assertEquals(10L, c.getTimestamp());
6485    LOG.info("c value " +
6486      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6487
6488    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6489      qual2, 0, qual2.length));
6490  }
6491
6492  HRegion initHRegion(TableName tableName, String callingMethod,
6493      byte[]... families) throws IOException {
6494    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6495        families);
6496  }
6497
6498  /**
6499   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6500   * @throws IOException if IO error occurred during test
6501   */
6502  @Test
6503  public void testWritesWhileRollWriter() throws IOException {
6504    int testCount = 10;
6505    int numRows = 1024;
6506    int numFamilies = 2;
6507    int numQualifiers = 2;
6508    final byte[][] families = new byte[numFamilies][];
6509    for (int i = 0; i < numFamilies; i++) {
6510      families[i] = Bytes.toBytes("family" + i);
6511    }
6512    final byte[][] qualifiers = new byte[numQualifiers][];
6513    for (int i = 0; i < numQualifiers; i++) {
6514      qualifiers[i] = Bytes.toBytes("qual" + i);
6515    }
6516
6517    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6518    this.region = initHRegion(tableName, method, CONF, families);
6519    try {
6520      List<Thread> threads = new ArrayList<>();
6521      for (int i = 0; i < numRows; i++) {
6522        final int count = i;
6523        Thread t = new Thread(new Runnable() {
6524
6525          @Override
6526          public void run() {
6527            byte[] row = Bytes.toBytes("row" + count);
6528            Put put = new Put(row);
6529            put.setDurability(Durability.SYNC_WAL);
6530            byte[] value = Bytes.toBytes(String.valueOf(count));
6531            for (byte[] family : families) {
6532              for (byte[] qualifier : qualifiers) {
6533                put.addColumn(family, qualifier, count, value);
6534              }
6535            }
6536            try {
6537              region.put(put);
6538            } catch (IOException e) {
6539              throw new RuntimeException(e);
6540            }
6541          }
6542        });
6543        threads.add(t);
6544      }
6545      for (Thread t : threads) {
6546        t.start();
6547      }
6548
6549      for (int i = 0; i < testCount; i++) {
6550        region.getWAL().rollWriter();
6551        Thread.yield();
6552      }
6553    } finally {
6554      try {
6555        HBaseTestingUtility.closeRegionAndWAL(this.region);
6556        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6557      } catch (DroppedSnapshotException dse) {
6558        // We could get this on way out because we interrupt the background flusher and it could
6559        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6560        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6561        // flush because the accounting is 'off' since original DSE.
6562      }
6563      this.region = null;
6564    }
6565  }
6566
6567  @Test
6568  public void testMutateRow_WriteRequestCount() throws Exception {
6569    byte[] row1 = Bytes.toBytes("row1");
6570    byte[] fam1 = Bytes.toBytes("fam1");
6571    byte[] qf1 = Bytes.toBytes("qualifier");
6572    byte[] val1 = Bytes.toBytes("value1");
6573
6574    RowMutations rm = new RowMutations(row1);
6575    Put put = new Put(row1);
6576    put.addColumn(fam1, qf1, val1);
6577    rm.add(put);
6578
6579    this.region = initHRegion(tableName, method, CONF, fam1);
6580    long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6581    this.region.mutateRow(rm);
6582    long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6583    Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6584  }
6585
6586  @Test
6587  public void testBulkLoadReplicationEnabled() throws IOException {
6588    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6589    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6590    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6591
6592    TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
6593      new TableDescriptorBuilder.ModifyableTableDescriptor(
6594        TableName.valueOf(name.getMethodName()));
6595    tableDescriptor.setColumnFamily(
6596      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam1));
6597    RegionInfo hri = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
6598    region = HRegion.openHRegion(hri, tableDescriptor, rss.getWAL(hri),
6599      TEST_UTIL.getConfiguration(), rss, null);
6600
6601    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6602    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6603    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6604    assertTrue(plugins.contains(replicationCoprocessorClass));
6605    assertTrue(region.getCoprocessorHost().
6606        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6607  }
6608
6609  /**
6610   * The same as HRegion class, the only difference is that instantiateHStore will
6611   * create a different HStore - HStoreForTesting. [HBASE-8518]
6612   */
6613  public static class HRegionForTesting extends HRegion {
6614
6615    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6616                             final Configuration confParam, final RegionInfo regionInfo,
6617                             final TableDescriptor htd, final RegionServerServices rsServices) {
6618      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6619          wal, confParam, htd, rsServices);
6620    }
6621
6622    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6623                             Configuration confParam, TableDescriptor htd,
6624                             RegionServerServices rsServices) {
6625      super(fs, wal, confParam, htd, rsServices);
6626    }
6627
6628    /**
6629     * Create HStore instance.
6630     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6631     */
6632    @Override
6633    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
6634        throws IOException {
6635      if (family.isMobEnabled()) {
6636        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6637          throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
6638              " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
6639              " accordingly.");
6640        }
6641        return new HMobStore(this, family, this.conf, warmup);
6642      }
6643      return new HStoreForTesting(this, family, this.conf, warmup);
6644    }
6645  }
6646
6647  /**
6648   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6649   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6650   * doesn't let hstore compaction complete. In the former edition, this config is set in
6651   * HStore class inside compact method, though this is just for testing, otherwise it
6652   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6653   * config (except for testing code).
6654   */
6655  public static class HStoreForTesting extends HStore {
6656
6657    protected HStoreForTesting(final HRegion region,
6658        final ColumnFamilyDescriptor family,
6659        final Configuration confParam, boolean warmup) throws IOException {
6660      super(region, family, confParam, warmup);
6661    }
6662
6663    @Override
6664    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6665        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6666        List<Path> newFiles) throws IOException {
6667      // let compaction incomplete.
6668      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6669        LOG.warn("hbase.hstore.compaction.complete is set to false");
6670        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6671        final boolean evictOnClose =
6672            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6673        for (Path newFile : newFiles) {
6674          // Create storefile around what we wrote with a reader on it.
6675          HStoreFile sf = createStoreFileAndReader(newFile);
6676          sf.closeStoreFile(evictOnClose);
6677          sfs.add(sf);
6678        }
6679        return sfs;
6680      }
6681      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6682    }
6683  }
6684}