001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
022import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
023import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
024import static org.junit.Assert.assertArrayEquals;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertFalse;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031import static org.mockito.ArgumentMatchers.any;
032import static org.mockito.ArgumentMatchers.anyBoolean;
033import static org.mockito.ArgumentMatchers.anyLong;
034import static org.mockito.Mockito.doThrow;
035import static org.mockito.Mockito.mock;
036import static org.mockito.Mockito.never;
037import static org.mockito.Mockito.spy;
038import static org.mockito.Mockito.times;
039import static org.mockito.Mockito.verify;
040import static org.mockito.Mockito.when;
041
042import java.io.IOException;
043import java.io.InterruptedIOException;
044import java.lang.reflect.Field;
045import java.math.BigDecimal;
046import java.nio.charset.StandardCharsets;
047import java.security.PrivilegedExceptionAction;
048import java.util.ArrayList;
049import java.util.Arrays;
050import java.util.Collection;
051import java.util.List;
052import java.util.Map;
053import java.util.NavigableMap;
054import java.util.Objects;
055import java.util.TreeMap;
056import java.util.concurrent.BlockingQueue;
057import java.util.concurrent.Callable;
058import java.util.concurrent.CountDownLatch;
059import java.util.concurrent.ExecutorService;
060import java.util.concurrent.Executors;
061import java.util.concurrent.Future;
062import java.util.concurrent.ScheduledExecutorService;
063import java.util.concurrent.ThreadPoolExecutor;
064import java.util.concurrent.TimeUnit;
065import java.util.concurrent.atomic.AtomicBoolean;
066import java.util.concurrent.atomic.AtomicInteger;
067import java.util.concurrent.atomic.AtomicReference;
068import org.apache.commons.lang3.RandomStringUtils;
069import org.apache.hadoop.conf.Configuration;
070import org.apache.hadoop.fs.FSDataOutputStream;
071import org.apache.hadoop.fs.FileStatus;
072import org.apache.hadoop.fs.FileSystem;
073import org.apache.hadoop.fs.Path;
074import org.apache.hadoop.hbase.ArrayBackedTag;
075import org.apache.hadoop.hbase.Cell;
076import org.apache.hadoop.hbase.Cell.Type;
077import org.apache.hadoop.hbase.CellBuilderFactory;
078import org.apache.hadoop.hbase.CellBuilderType;
079import org.apache.hadoop.hbase.CellUtil;
080import org.apache.hadoop.hbase.CompareOperator;
081import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
082import org.apache.hadoop.hbase.DroppedSnapshotException;
083import org.apache.hadoop.hbase.HBaseClassTestRule;
084import org.apache.hadoop.hbase.HBaseConfiguration;
085import org.apache.hadoop.hbase.HBaseTestingUtility;
086import org.apache.hadoop.hbase.HColumnDescriptor;
087import org.apache.hadoop.hbase.HConstants;
088import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
089import org.apache.hadoop.hbase.HDFSBlocksDistribution;
090import org.apache.hadoop.hbase.HRegionInfo;
091import org.apache.hadoop.hbase.HTableDescriptor;
092import org.apache.hadoop.hbase.KeyValue;
093import org.apache.hadoop.hbase.MiniHBaseCluster;
094import org.apache.hadoop.hbase.MultithreadedTestUtil;
095import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
096import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
097import org.apache.hadoop.hbase.NotServingRegionException;
098import org.apache.hadoop.hbase.PrivateCellUtil;
099import org.apache.hadoop.hbase.RegionTooBusyException;
100import org.apache.hadoop.hbase.ServerName;
101import org.apache.hadoop.hbase.StartMiniClusterOption;
102import org.apache.hadoop.hbase.TableName;
103import org.apache.hadoop.hbase.TagType;
104import org.apache.hadoop.hbase.Waiter;
105import org.apache.hadoop.hbase.client.Append;
106import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
107import org.apache.hadoop.hbase.client.Delete;
108import org.apache.hadoop.hbase.client.Durability;
109import org.apache.hadoop.hbase.client.Get;
110import org.apache.hadoop.hbase.client.Increment;
111import org.apache.hadoop.hbase.client.Mutation;
112import org.apache.hadoop.hbase.client.Put;
113import org.apache.hadoop.hbase.client.RegionInfo;
114import org.apache.hadoop.hbase.client.RegionInfoBuilder;
115import org.apache.hadoop.hbase.client.Result;
116import org.apache.hadoop.hbase.client.RowMutations;
117import org.apache.hadoop.hbase.client.Scan;
118import org.apache.hadoop.hbase.client.Table;
119import org.apache.hadoop.hbase.client.TableDescriptor;
120import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
121import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
122import org.apache.hadoop.hbase.filter.BigDecimalComparator;
123import org.apache.hadoop.hbase.filter.BinaryComparator;
124import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
125import org.apache.hadoop.hbase.filter.Filter;
126import org.apache.hadoop.hbase.filter.FilterBase;
127import org.apache.hadoop.hbase.filter.FilterList;
128import org.apache.hadoop.hbase.filter.NullComparator;
129import org.apache.hadoop.hbase.filter.PrefixFilter;
130import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
131import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
132import org.apache.hadoop.hbase.filter.SubstringComparator;
133import org.apache.hadoop.hbase.filter.ValueFilter;
134import org.apache.hadoop.hbase.io.hfile.HFile;
135import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
136import org.apache.hadoop.hbase.monitoring.MonitoredTask;
137import org.apache.hadoop.hbase.monitoring.TaskMonitor;
138import org.apache.hadoop.hbase.regionserver.HRegion.MutationBatchOperation;
139import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
140import org.apache.hadoop.hbase.regionserver.Region.RowLock;
141import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
142import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
143import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
144import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
145import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
146import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
147import org.apache.hadoop.hbase.security.User;
148import org.apache.hadoop.hbase.test.MetricsAssertHelper;
149import org.apache.hadoop.hbase.testclassification.LargeTests;
150import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
151import org.apache.hadoop.hbase.util.Bytes;
152import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
153import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
154import org.apache.hadoop.hbase.util.FSUtils;
155import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
156import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
157import org.apache.hadoop.hbase.util.Threads;
158import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
159import org.apache.hadoop.hbase.wal.FaultyFSLog;
160import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
161import org.apache.hadoop.hbase.wal.WAL;
162import org.apache.hadoop.hbase.wal.WALEdit;
163import org.apache.hadoop.hbase.wal.WALFactory;
164import org.apache.hadoop.hbase.wal.WALKeyImpl;
165import org.apache.hadoop.hbase.wal.WALProvider;
166import org.apache.hadoop.hbase.wal.WALProvider.Writer;
167import org.apache.hadoop.hbase.wal.WALSplitUtil;
168import org.apache.hadoop.hbase.wal.WALSplitter;
169import org.apache.hadoop.metrics2.MetricsExecutor;
170import org.junit.After;
171import org.junit.Assert;
172import org.junit.Before;
173import org.junit.ClassRule;
174import org.junit.Rule;
175import org.junit.Test;
176import org.junit.experimental.categories.Category;
177import org.junit.rules.ExpectedException;
178import org.junit.rules.TestName;
179import org.mockito.ArgumentCaptor;
180import org.mockito.ArgumentMatcher;
181import org.mockito.Mockito;
182import org.mockito.invocation.InvocationOnMock;
183import org.mockito.stubbing.Answer;
184import org.slf4j.Logger;
185import org.slf4j.LoggerFactory;
186
187import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
188import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
189import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
190import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
191import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
192
193import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
194import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
195import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
196import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
197import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
198import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
199import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
200
201/**
202 * Basic stand-alone testing of HRegion.  No clusters!
203 *
204 * A lot of the meta information for an HRegion now lives inside other HRegions
205 * or in the HBaseMaster, so only basic testing is possible.
206 */
207@Category({VerySlowRegionServerTests.class, LargeTests.class})
208@SuppressWarnings("deprecation")
209public class TestHRegion {
210
211  @ClassRule
212  public static final HBaseClassTestRule CLASS_RULE =
213      HBaseClassTestRule.forClass(TestHRegion.class);
214
215  // Do not spin up clusters in here. If you need to spin up a cluster, do it
216  // over in TestHRegionOnCluster.
217  private static final Logger LOG = LoggerFactory.getLogger(TestHRegion.class);
218  @Rule
219  public TestName name = new TestName();
220  @Rule public final ExpectedException thrown = ExpectedException.none();
221
222  private static final String COLUMN_FAMILY = "MyCF";
223  private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
224  private static final EventLoopGroup GROUP = new NioEventLoopGroup();
225
226  HRegion region = null;
227  // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
228  protected static HBaseTestingUtility TEST_UTIL;
229  public static Configuration CONF ;
230  private String dir;
231  private final int MAX_VERSIONS = 2;
232
233  // Test names
234  protected TableName tableName;
235  protected String method;
236  protected final byte[] qual = Bytes.toBytes("qual");
237  protected final byte[] qual1 = Bytes.toBytes("qual1");
238  protected final byte[] qual2 = Bytes.toBytes("qual2");
239  protected final byte[] qual3 = Bytes.toBytes("qual3");
240  protected final byte[] value = Bytes.toBytes("value");
241  protected final byte[] value1 = Bytes.toBytes("value1");
242  protected final byte[] value2 = Bytes.toBytes("value2");
243  protected final byte[] row = Bytes.toBytes("rowA");
244  protected final byte[] row2 = Bytes.toBytes("rowB");
245
246  protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
247      .getInstance(MetricsAssertHelper.class);
248
249  @Before
250  public void setup() throws IOException {
251    TEST_UTIL = HBaseTestingUtility.createLocalHTU();
252    CONF = TEST_UTIL.getConfiguration();
253    NettyAsyncFSWALConfigHelper.setEventLoopConfig(CONF, GROUP, NioSocketChannel.class);
254    dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
255    method = name.getMethodName();
256    tableName = TableName.valueOf(method);
257    CONF.set(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, String.valueOf(0.09));
258  }
259
260  @After
261  public void tearDown() throws IOException {
262    // Region may have been closed, but it is still no harm if we close it again here using HTU.
263    HBaseTestingUtility.closeRegionAndWAL(region);
264    EnvironmentEdgeManagerTestHelper.reset();
265    LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
266    TEST_UTIL.cleanupTestDir();
267  }
268
269  /**
270   * Test that I can use the max flushed sequence id after the close.
271   * @throws IOException
272   */
273  @Test
274  public void testSequenceId() throws IOException {
275    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
276    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
277    // Weird. This returns 0 if no store files or no edits. Afraid to change it.
278    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
279    HBaseTestingUtility.closeRegionAndWAL(this.region);
280    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
281    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
282    // Open region again.
283    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
284    byte [] value = Bytes.toBytes(method);
285    // Make a random put against our cf.
286    Put put = new Put(value);
287    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
288    region.put(put);
289    // No flush yet so init numbers should still be in place.
290    assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
291    assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
292    region.flush(true);
293    long max = region.getMaxFlushedSeqId();
294    HBaseTestingUtility.closeRegionAndWAL(this.region);
295    assertEquals(max, region.getMaxFlushedSeqId());
296    this.region = null;
297  }
298
299  /**
300   * Test for Bug 2 of HBASE-10466.
301   * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
302   * is smaller than a certain value, or when region close starts a flush is ongoing, the first
303   * flush is skipped and only the second flush takes place. However, two flushes are required in
304   * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
305   * in current memstore. The fix is removing all conditions except abort check so we ensure 2
306   * flushes for region close."
307   * @throws IOException
308   */
309  @Test
310  public void testCloseCarryingSnapshot() throws IOException {
311    region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
312    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
313    // Get some random bytes.
314    byte [] value = Bytes.toBytes(method);
315    // Make a random put against our cf.
316    Put put = new Put(value);
317    put.addColumn(COLUMN_FAMILY_BYTES, null, value);
318    // First put something in current memstore, which will be in snapshot after flusher.prepare()
319    region.put(put);
320    StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
321    storeFlushCtx.prepare();
322    // Second put something in current memstore
323    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
324    region.put(put);
325    // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
326    HBaseTestingUtility.closeRegionAndWAL(region);
327    assertEquals(0, region.getMemStoreDataSize());
328    region = null;
329  }
330
331  /*
332   * This test is for verifying memstore snapshot size is correctly updated in case of rollback
333   * See HBASE-10845
334   */
335  @Test
336  public void testMemstoreSnapshotSize() throws IOException {
337    class MyFaultyFSLog extends FaultyFSLog {
338      StoreFlushContext storeFlushCtx;
339      public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
340          throws IOException {
341        super(fs, rootDir, logName, conf);
342      }
343
344      void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
345        this.storeFlushCtx = storeFlushCtx;
346      }
347
348      @Override
349      public void sync(long txid) throws IOException {
350        storeFlushCtx.prepare();
351        super.sync(txid);
352      }
353    }
354
355    FileSystem fs = FileSystem.get(CONF);
356    Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
357    MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
358    faultyLog.init();
359    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, faultyLog,
360        COLUMN_FAMILY_BYTES);
361
362    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
363    // Get some random bytes.
364    byte [] value = Bytes.toBytes(method);
365    faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
366
367    Put put = new Put(value);
368    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
369    faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
370    boolean threwIOE = false;
371    try {
372      region.put(put);
373    } catch (IOException ioe) {
374      threwIOE = true;
375    } finally {
376      assertTrue("The regionserver should have thrown an exception", threwIOE);
377    }
378    MemStoreSize mss = store.getFlushableSize();
379    assertTrue("flushable size should be zero, but it is " + mss,
380        mss.getDataSize() == 0);
381  }
382
383  /**
384   * Create a WAL outside of the usual helper in
385   * {@link HBaseTestingUtility#createWal(Configuration, Path, RegionInfo)} because that method
386   * doesn't play nicely with FaultyFileSystem. Call this method before overriding
387   * {@code fs.file.impl}.
388   * @param callingMethod a unique component for the path, probably the name of the test method.
389   */
390  private static WAL createWALCompatibleWithFaultyFileSystem(String callingMethod,
391      Configuration conf, TableName tableName) throws IOException {
392    final Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
393    final Configuration walConf = new Configuration(conf);
394    FSUtils.setRootDir(walConf, logDir);
395    return new WALFactory(walConf, callingMethod)
396        .getWAL(RegionInfoBuilder.newBuilder(tableName).build());
397  }
398
399  @Test
400  public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
401    String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
402    FileSystem fs = FileSystem.get(CONF);
403    Path rootDir = new Path(dir + testName);
404    FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
405    hLog.init();
406    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
407        COLUMN_FAMILY_BYTES);
408    HStore store = region.getStore(COLUMN_FAMILY_BYTES);
409    assertEquals(0, region.getMemStoreDataSize());
410
411    // Put one value
412    byte [] value = Bytes.toBytes(method);
413    Put put = new Put(value);
414    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
415    region.put(put);
416    long onePutSize = region.getMemStoreDataSize();
417    assertTrue(onePutSize > 0);
418
419    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
420    doThrow(new IOException())
421       .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
422    region.setCoprocessorHost(mockedCPHost);
423
424    put = new Put(value);
425    put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
426    try {
427      region.put(put);
428      fail("Should have failed with IOException");
429    } catch (IOException expected) {
430    }
431    long expectedSize = onePutSize * 2;
432    assertEquals("memstoreSize should be incremented",
433        expectedSize, region.getMemStoreDataSize());
434    assertEquals("flushable size should be incremented",
435        expectedSize, store.getFlushableSize().getDataSize());
436
437    region.setCoprocessorHost(null);
438  }
439
440  /**
441   * A test case of HBASE-21041
442   * @throws Exception Exception
443   */
444  @Test
445  public void testFlushAndMemstoreSizeCounting() throws Exception {
446    byte[] family = Bytes.toBytes("family");
447    this.region = initHRegion(tableName, method, CONF, family);
448    final WALFactory wals = new WALFactory(CONF, method);
449    try {
450      for (byte[] row : HBaseTestingUtility.ROWS) {
451        Put put = new Put(row);
452        put.addColumn(family, family, row);
453        region.put(put);
454      }
455      region.flush(true);
456      // After flush, data size should be zero
457      assertEquals(0, region.getMemStoreDataSize());
458      // After flush, a new active mutable segment is created, so the heap size
459      // should equal to MutableSegment.DEEP_OVERHEAD
460      assertEquals(MutableSegment.DEEP_OVERHEAD, region.getMemStoreHeapSize());
461      // After flush, offheap should be zero
462      assertEquals(0, region.getMemStoreOffHeapSize());
463    } finally {
464      HBaseTestingUtility.closeRegionAndWAL(this.region);
465      this.region = null;
466      wals.close();
467    }
468  }
469
470  /**
471   * Test we do not lose data if we fail a flush and then close.
472   * Part of HBase-10466.  Tests the following from the issue description:
473   * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
474   * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
475   * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
476   * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
477   * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
478   * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
479   * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
480   * much smaller than expected. In extreme case, if the error accumulates to even bigger than
481   * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
482   * if memstoreSize is not larger than 0."
483   * @throws Exception
484   */
485  @Test
486  public void testFlushSizeAccounting() throws Exception {
487    final Configuration conf = HBaseConfiguration.create(CONF);
488    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
489    // Only retry once.
490    conf.setInt("hbase.hstore.flush.retries.number", 1);
491    final User user =
492      User.createUserForTesting(conf, method, new String[]{"foo"});
493    // Inject our faulty LocalFileSystem
494    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
495    user.runAs(new PrivilegedExceptionAction<Object>() {
496      @Override
497      public Object run() throws Exception {
498        // Make sure it worked (above is sensitive to caching details in hadoop core)
499        FileSystem fs = FileSystem.get(conf);
500        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
501        FaultyFileSystem ffs = (FaultyFileSystem)fs;
502        HRegion region = null;
503        try {
504          // Initialize region
505          region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, wal,
506              COLUMN_FAMILY_BYTES);
507          long size = region.getMemStoreDataSize();
508          Assert.assertEquals(0, size);
509          // Put one item into memstore.  Measure the size of one item in memstore.
510          Put p1 = new Put(row);
511          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[]) null));
512          region.put(p1);
513          final long sizeOfOnePut = region.getMemStoreDataSize();
514          // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
515          try {
516            LOG.info("Flushing");
517            region.flush(true);
518            Assert.fail("Didn't bubble up IOE!");
519          } catch (DroppedSnapshotException dse) {
520            // What we are expecting
521            region.closing.set(false); // this is needed for the rest of the test to work
522          }
523          // Make it so all writes succeed from here on out
524          ffs.fault.set(false);
525          // Check sizes.  Should still be the one entry.
526          Assert.assertEquals(sizeOfOnePut, region.getMemStoreDataSize());
527          // Now add two entries so that on this next flush that fails, we can see if we
528          // subtract the right amount, the snapshot size only.
529          Put p2 = new Put(row);
530          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
531          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
532          region.put(p2);
533          long expectedSize = sizeOfOnePut * 3;
534          Assert.assertEquals(expectedSize, region.getMemStoreDataSize());
535          // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
536          // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
537          // it
538          region.flush(true);
539          // Make sure our memory accounting is right.
540          Assert.assertEquals(sizeOfOnePut * 2, region.getMemStoreDataSize());
541        } finally {
542          HBaseTestingUtility.closeRegionAndWAL(region);
543        }
544        return null;
545      }
546    });
547    FileSystem.closeAllForUGI(user.getUGI());
548  }
549
550  @Test
551  public void testCloseWithFailingFlush() throws Exception {
552    final Configuration conf = HBaseConfiguration.create(CONF);
553    final WAL wal = createWALCompatibleWithFaultyFileSystem(method, conf, tableName);
554    // Only retry once.
555    conf.setInt("hbase.hstore.flush.retries.number", 1);
556    final User user =
557      User.createUserForTesting(conf, this.method, new String[]{"foo"});
558    // Inject our faulty LocalFileSystem
559    conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
560    user.runAs(new PrivilegedExceptionAction<Object>() {
561      @Override
562      public Object run() throws Exception {
563        // Make sure it worked (above is sensitive to caching details in hadoop core)
564        FileSystem fs = FileSystem.get(conf);
565        Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
566        FaultyFileSystem ffs = (FaultyFileSystem)fs;
567        HRegion region = null;
568        try {
569          // Initialize region
570          region = initHRegion(tableName, null, null, false,
571              Durability.SYNC_WAL, wal, COLUMN_FAMILY_BYTES);
572          long size = region.getMemStoreDataSize();
573          Assert.assertEquals(0, size);
574          // Put one item into memstore.  Measure the size of one item in memstore.
575          Put p1 = new Put(row);
576          p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
577          region.put(p1);
578          // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
579          HStore store = region.getStore(COLUMN_FAMILY_BYTES);
580          StoreFlushContext storeFlushCtx =
581              store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
582          storeFlushCtx.prepare();
583          // Now add two entries to the foreground memstore.
584          Put p2 = new Put(row);
585          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
586          p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
587          region.put(p2);
588          // Now try close on top of a failing flush.
589          HBaseTestingUtility.closeRegionAndWAL(region);
590          region = null;
591          fail();
592        } catch (DroppedSnapshotException dse) {
593          // Expected
594          LOG.info("Expected DroppedSnapshotException");
595        } finally {
596          // Make it so all writes succeed from here on out so can close clean
597          ffs.fault.set(false);
598          HBaseTestingUtility.closeRegionAndWAL(region);
599        }
600        return null;
601      }
602    });
603    FileSystem.closeAllForUGI(user.getUGI());
604  }
605
606  @Test
607  public void testCompactionAffectedByScanners() throws Exception {
608    byte[] family = Bytes.toBytes("family");
609    this.region = initHRegion(tableName, method, CONF, family);
610
611    Put put = new Put(Bytes.toBytes("r1"));
612    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
613    region.put(put);
614    region.flush(true);
615
616    Scan scan = new Scan();
617    scan.setMaxVersions(3);
618    // open the first scanner
619    RegionScanner scanner1 = region.getScanner(scan);
620
621    Delete delete = new Delete(Bytes.toBytes("r1"));
622    region.delete(delete);
623    region.flush(true);
624
625    // open the second scanner
626    RegionScanner scanner2 = region.getScanner(scan);
627
628    List<Cell> results = new ArrayList<>();
629
630    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
631
632    // make a major compaction
633    region.compact(true);
634
635    // open the third scanner
636    RegionScanner scanner3 = region.getScanner(scan);
637
638    // get data from scanner 1, 2, 3 after major compaction
639    scanner1.next(results);
640    System.out.println(results);
641    assertEquals(1, results.size());
642
643    results.clear();
644    scanner2.next(results);
645    System.out.println(results);
646    assertEquals(0, results.size());
647
648    results.clear();
649    scanner3.next(results);
650    System.out.println(results);
651    assertEquals(0, results.size());
652  }
653
654  @Test
655  public void testToShowNPEOnRegionScannerReseek() throws Exception {
656    byte[] family = Bytes.toBytes("family");
657    this.region = initHRegion(tableName, method, CONF, family);
658
659    Put put = new Put(Bytes.toBytes("r1"));
660    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
661    region.put(put);
662    put = new Put(Bytes.toBytes("r2"));
663    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
664    region.put(put);
665    region.flush(true);
666
667    Scan scan = new Scan();
668    scan.setMaxVersions(3);
669    // open the first scanner
670    RegionScanner scanner1 = region.getScanner(scan);
671
672    System.out.println("Smallest read point:" + region.getSmallestReadPoint());
673
674    region.compact(true);
675
676    scanner1.reseek(Bytes.toBytes("r2"));
677    List<Cell> results = new ArrayList<>();
678    scanner1.next(results);
679    Cell keyValue = results.get(0);
680    Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
681    scanner1.close();
682  }
683
684  @Test
685  public void testSkipRecoveredEditsReplay() throws Exception {
686    byte[] family = Bytes.toBytes("family");
687    this.region = initHRegion(tableName, method, CONF, family);
688    final WALFactory wals = new WALFactory(CONF, method);
689    try {
690      Path regiondir = region.getRegionFileSystem().getRegionDir();
691      FileSystem fs = region.getRegionFileSystem().getFileSystem();
692      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
693
694      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
695
696      long maxSeqId = 1050;
697      long minSeqId = 1000;
698
699      for (long i = minSeqId; i <= maxSeqId; i += 10) {
700        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
701        fs.create(recoveredEdits);
702        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
703
704        long time = System.nanoTime();
705        WALEdit edit = new WALEdit();
706        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
707            .toBytes(i)));
708        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
709            HConstants.DEFAULT_CLUSTER_ID), edit));
710
711        writer.close();
712      }
713      MonitoredTask status = TaskMonitor.get().createStatus(method);
714      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
715      for (HStore store : region.getStores()) {
716        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId - 1);
717      }
718      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
719      assertEquals(maxSeqId, seqId);
720      region.getMVCC().advanceTo(seqId);
721      Get get = new Get(row);
722      Result result = region.get(get);
723      for (long i = minSeqId; i <= maxSeqId; i += 10) {
724        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
725        assertEquals(1, kvs.size());
726        assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
727      }
728    } finally {
729      HBaseTestingUtility.closeRegionAndWAL(this.region);
730      this.region = null;
731      wals.close();
732    }
733  }
734
735  @Test
736  public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
737    byte[] family = Bytes.toBytes("family");
738    this.region = initHRegion(tableName, method, CONF, family);
739    final WALFactory wals = new WALFactory(CONF, method);
740    try {
741      Path regiondir = region.getRegionFileSystem().getRegionDir();
742      FileSystem fs = region.getRegionFileSystem().getFileSystem();
743      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
744
745      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
746
747      long maxSeqId = 1050;
748      long minSeqId = 1000;
749
750      for (long i = minSeqId; i <= maxSeqId; i += 10) {
751        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
752        fs.create(recoveredEdits);
753        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
754
755        long time = System.nanoTime();
756        WALEdit edit = new WALEdit();
757        edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
758            .toBytes(i)));
759        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
760            HConstants.DEFAULT_CLUSTER_ID), edit));
761
762        writer.close();
763      }
764      long recoverSeqId = 1030;
765      MonitoredTask status = TaskMonitor.get().createStatus(method);
766      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
767      for (HStore store : region.getStores()) {
768        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
769      }
770      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
771      assertEquals(maxSeqId, seqId);
772      region.getMVCC().advanceTo(seqId);
773      Get get = new Get(row);
774      Result result = region.get(get);
775      for (long i = minSeqId; i <= maxSeqId; i += 10) {
776        List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
777        if (i < recoverSeqId) {
778          assertEquals(0, kvs.size());
779        } else {
780          assertEquals(1, kvs.size());
781          assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
782        }
783      }
784    } finally {
785      HBaseTestingUtility.closeRegionAndWAL(this.region);
786      this.region = null;
787      wals.close();
788    }
789  }
790
791  @Test
792  public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
793    byte[] family = Bytes.toBytes("family");
794    this.region = initHRegion(tableName, method, CONF, family);
795    Path regiondir = region.getRegionFileSystem().getRegionDir();
796    FileSystem fs = region.getRegionFileSystem().getFileSystem();
797
798    Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
799    for (int i = 1000; i < 1050; i += 10) {
800      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
801      FSDataOutputStream dos = fs.create(recoveredEdits);
802      dos.writeInt(i);
803      dos.close();
804    }
805    long minSeqId = 2000;
806    Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
807    FSDataOutputStream dos = fs.create(recoveredEdits);
808    dos.close();
809
810    Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
811    for (HStore store : region.getStores()) {
812      maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), minSeqId);
813    }
814    long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, null);
815    assertEquals(minSeqId, seqId);
816  }
817
818  @Test
819  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
820    byte[] family = Bytes.toBytes("family");
821    this.region = initHRegion(tableName, method, CONF, family);
822    final WALFactory wals = new WALFactory(CONF, method);
823    try {
824      Path regiondir = region.getRegionFileSystem().getRegionDir();
825      FileSystem fs = region.getRegionFileSystem().getFileSystem();
826      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
827      byte[][] columns = region.getTableDescriptor().getColumnFamilyNames().toArray(new byte[0][]);
828
829      assertEquals(0, region.getStoreFileList(columns).size());
830
831      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
832
833      long maxSeqId = 1050;
834      long minSeqId = 1000;
835
836      for (long i = minSeqId; i <= maxSeqId; i += 10) {
837        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
838        fs.create(recoveredEdits);
839        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
840
841        long time = System.nanoTime();
842        WALEdit edit = null;
843        if (i == maxSeqId) {
844          edit = WALEdit.createCompaction(region.getRegionInfo(),
845          CompactionDescriptor.newBuilder()
846          .setTableName(ByteString.copyFrom(tableName.getName()))
847          .setFamilyName(ByteString.copyFrom(regionName))
848          .setEncodedRegionName(ByteString.copyFrom(regionName))
849          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
850          .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
851          .build());
852        } else {
853          edit = new WALEdit();
854          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
855            .toBytes(i)));
856        }
857        writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
858            HConstants.DEFAULT_CLUSTER_ID), edit));
859        writer.close();
860      }
861
862      long recoverSeqId = 1030;
863      Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
864      MonitoredTask status = TaskMonitor.get().createStatus(method);
865      for (HStore store : region.getStores()) {
866        maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()), recoverSeqId - 1);
867      }
868      long seqId = region.replayRecoveredEditsIfAny(maxSeqIdInStores, null, status);
869      assertEquals(maxSeqId, seqId);
870
871      // assert that the files are flushed
872      assertEquals(1, region.getStoreFileList(columns).size());
873
874    } finally {
875      HBaseTestingUtility.closeRegionAndWAL(this.region);
876      this.region = null;
877      wals.close();
878    }
879  }
880
881  @Test
882  public void testRecoveredEditsReplayCompaction() throws Exception {
883    testRecoveredEditsReplayCompaction(false);
884    testRecoveredEditsReplayCompaction(true);
885  }
886
887  public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
888    CONF.setClass(HConstants.REGION_IMPL, HRegionForTesting.class, Region.class);
889    byte[] family = Bytes.toBytes("family");
890    this.region = initHRegion(tableName, method, CONF, family);
891    final WALFactory wals = new WALFactory(CONF, method);
892    try {
893      Path regiondir = region.getRegionFileSystem().getRegionDir();
894      FileSystem fs = region.getRegionFileSystem().getFileSystem();
895      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
896
897      long maxSeqId = 3;
898      long minSeqId = 0;
899
900      for (long i = minSeqId; i < maxSeqId; i++) {
901        Put put = new Put(Bytes.toBytes(i));
902        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
903        region.put(put);
904        region.flush(true);
905      }
906
907      // this will create a region with 3 files
908      assertEquals(3, region.getStore(family).getStorefilesCount());
909      List<Path> storeFiles = new ArrayList<>(3);
910      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
911        storeFiles.add(sf.getPath());
912      }
913
914      // disable compaction completion
915      CONF.setBoolean("hbase.hstore.compaction.complete", false);
916      region.compactStores();
917
918      // ensure that nothing changed
919      assertEquals(3, region.getStore(family).getStorefilesCount());
920
921      // now find the compacted file, and manually add it to the recovered edits
922      Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
923      FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
924      String errorMsg = "Expected to find 1 file in the region temp directory "
925          + "from the compaction, could not find any";
926      assertNotNull(errorMsg, files);
927      assertEquals(errorMsg, 1, files.length);
928      // move the file inside region dir
929      Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
930          files[0].getPath());
931
932      byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
933      byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
934      for (int i=0; i < encodedNameAsBytes.length; i++) {
935        // Mix the byte array to have a new encodedName
936        fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
937      }
938
939      CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
940        .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
941            storeFiles, Lists.newArrayList(newFile),
942            region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
943
944      WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
945          this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
946
947      Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
948
949      Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
950      fs.create(recoveredEdits);
951      WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
952
953      long time = System.nanoTime();
954
955      writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
956          HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
957          compactionDescriptor)));
958      writer.close();
959
960      // close the region now, and reopen again
961      region.getTableDescriptor();
962      region.getRegionInfo();
963      HBaseTestingUtility.closeRegionAndWAL(this.region);
964      try {
965        region = HRegion.openHRegion(region, null);
966      } catch (WrongRegionException wre) {
967        fail("Matching encoded region name should not have produced WrongRegionException");
968      }
969
970      // now check whether we have only one store file, the compacted one
971      Collection<HStoreFile> sfs = region.getStore(family).getStorefiles();
972      for (HStoreFile sf : sfs) {
973        LOG.info(Objects.toString(sf.getPath()));
974      }
975      if (!mismatchedRegionName) {
976        assertEquals(1, region.getStore(family).getStorefilesCount());
977      }
978      files = FSUtils.listStatus(fs, tmpDir);
979      assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
980
981      for (long i = minSeqId; i < maxSeqId; i++) {
982        Get get = new Get(Bytes.toBytes(i));
983        Result result = region.get(get);
984        byte[] value = result.getValue(family, Bytes.toBytes(i));
985        assertArrayEquals(Bytes.toBytes(i), value);
986      }
987    } finally {
988      HBaseTestingUtility.closeRegionAndWAL(this.region);
989      this.region = null;
990      wals.close();
991      CONF.setClass(HConstants.REGION_IMPL, HRegion.class, Region.class);
992    }
993  }
994
995  @Test
996  public void testFlushMarkers() throws Exception {
997    // tests that flush markers are written to WAL and handled at recovered edits
998    byte[] family = Bytes.toBytes("family");
999    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
1000    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1001    FSUtils.setRootDir(walConf, logDir);
1002    final WALFactory wals = new WALFactory(walConf, method);
1003    final WAL wal = wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build());
1004
1005    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1006      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1007    try {
1008      Path regiondir = region.getRegionFileSystem().getRegionDir();
1009      FileSystem fs = region.getRegionFileSystem().getFileSystem();
1010      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
1011
1012      long maxSeqId = 3;
1013      long minSeqId = 0;
1014
1015      for (long i = minSeqId; i < maxSeqId; i++) {
1016        Put put = new Put(Bytes.toBytes(i));
1017        put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1018        region.put(put);
1019        region.flush(true);
1020      }
1021
1022      // this will create a region with 3 files from flush
1023      assertEquals(3, region.getStore(family).getStorefilesCount());
1024      List<String> storeFiles = new ArrayList<>(3);
1025      for (HStoreFile sf : region.getStore(family).getStorefiles()) {
1026        storeFiles.add(sf.getPath().getName());
1027      }
1028
1029      // now verify that the flush markers are written
1030      wal.shutdown();
1031      WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
1032        TEST_UTIL.getConfiguration());
1033      try {
1034        List<WAL.Entry> flushDescriptors = new ArrayList<>();
1035        long lastFlushSeqId = -1;
1036        while (true) {
1037          WAL.Entry entry = reader.next();
1038          if (entry == null) {
1039            break;
1040          }
1041          Cell cell = entry.getEdit().getCells().get(0);
1042          if (WALEdit.isMetaEditFamily(cell)) {
1043            FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1044            assertNotNull(flushDesc);
1045            assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1046            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1047              assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1048            } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1049              assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1050            }
1051            lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1052            assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1053            assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1054            StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1055            assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1056            assertEquals("family", storeFlushDesc.getStoreHomeDir());
1057            if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1058              assertEquals(0, storeFlushDesc.getFlushOutputCount());
1059            } else {
1060              assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1061              assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1062            }
1063
1064            flushDescriptors.add(entry);
1065          }
1066        }
1067
1068        assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1069
1070        // now write those markers to the recovered edits again.
1071
1072        Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
1073
1074        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1075        fs.create(recoveredEdits);
1076        WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1077
1078        for (WAL.Entry entry : flushDescriptors) {
1079          writer.append(entry);
1080        }
1081        writer.close();
1082      } finally {
1083        if (null != reader) {
1084          try {
1085            reader.close();
1086          } catch (IOException exception) {
1087            LOG.warn("Problem closing wal: " + exception.getMessage());
1088            LOG.debug("exception details", exception);
1089          }
1090        }
1091      }
1092
1093      // close the region now, and reopen again
1094      HBaseTestingUtility.closeRegionAndWAL(this.region);
1095      region = HRegion.openHRegion(region, null);
1096
1097      // now check whether we have can read back the data from region
1098      for (long i = minSeqId; i < maxSeqId; i++) {
1099        Get get = new Get(Bytes.toBytes(i));
1100        Result result = region.get(get);
1101        byte[] value = result.getValue(family, Bytes.toBytes(i));
1102        assertArrayEquals(Bytes.toBytes(i), value);
1103      }
1104    } finally {
1105      HBaseTestingUtility.closeRegionAndWAL(this.region);
1106      this.region = null;
1107      wals.close();
1108    }
1109  }
1110
1111  static class IsFlushWALMarker implements ArgumentMatcher<WALEdit> {
1112    volatile FlushAction[] actions;
1113    public IsFlushWALMarker(FlushAction... actions) {
1114      this.actions = actions;
1115    }
1116    @Override
1117    public boolean matches(WALEdit edit) {
1118      List<Cell> cells = edit.getCells();
1119      if (cells.isEmpty()) {
1120        return false;
1121      }
1122      if (WALEdit.isMetaEditFamily(cells.get(0))) {
1123        FlushDescriptor desc;
1124        try {
1125          desc = WALEdit.getFlushDescriptor(cells.get(0));
1126        } catch (IOException e) {
1127          LOG.warn(e.toString(), e);
1128          return false;
1129        }
1130        if (desc != null) {
1131          for (FlushAction action : actions) {
1132            if (desc.getAction() == action) {
1133              return true;
1134            }
1135          }
1136        }
1137      }
1138      return false;
1139    }
1140    public IsFlushWALMarker set(FlushAction... actions) {
1141      this.actions = actions;
1142      return this;
1143    }
1144  }
1145
1146  @Test
1147  public void testFlushMarkersWALFail() throws Exception {
1148    // test the cases where the WAL append for flush markers fail.
1149    byte[] family = Bytes.toBytes("family");
1150
1151    // spy an actual WAL implementation to throw exception (was not able to mock)
1152    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1153
1154    final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1155    FSUtils.setRootDir(walConf, logDir);
1156    // Make up a WAL that we can manipulate at append time.
1157    class FailAppendFlushMarkerWAL extends FSHLog {
1158      volatile FlushAction [] flushActions = null;
1159
1160      public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1161      throws IOException {
1162        super(fs, root, logDir, conf);
1163      }
1164
1165      @Override
1166      protected Writer createWriterInstance(Path path) throws IOException {
1167        final Writer w = super.createWriterInstance(path);
1168        return new Writer() {
1169          @Override
1170          public void close() throws IOException {
1171            w.close();
1172          }
1173
1174          @Override
1175          public void sync(boolean forceSync) throws IOException {
1176            w.sync(forceSync);
1177          }
1178
1179          @Override
1180          public void append(Entry entry) throws IOException {
1181            List<Cell> cells = entry.getEdit().getCells();
1182            if (WALEdit.isMetaEditFamily(cells.get(0))) {
1183              FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1184              if (desc != null) {
1185                for (FlushAction flushAction: flushActions) {
1186                  if (desc.getAction().equals(flushAction)) {
1187                    throw new IOException("Failed to append flush marker! " + flushAction);
1188                  }
1189                }
1190              }
1191            }
1192            w.append(entry);
1193          }
1194
1195          @Override
1196          public long getLength() {
1197            return w.getLength();
1198          }
1199        };
1200      }
1201    }
1202    FailAppendFlushMarkerWAL wal =
1203      new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1204        method, walConf);
1205    wal.init();
1206    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1207      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1208    int i = 0;
1209    Put put = new Put(Bytes.toBytes(i));
1210    put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1211    put.addColumn(family, Bytes.toBytes(i), Bytes.toBytes(i));
1212    region.put(put);
1213
1214    // 1. Test case where START_FLUSH throws exception
1215    wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1216
1217    // start cache flush will throw exception
1218    try {
1219      region.flush(true);
1220      fail("This should have thrown exception");
1221    } catch (DroppedSnapshotException unexpected) {
1222      // this should not be a dropped snapshot exception. Meaning that RS will not abort
1223      throw unexpected;
1224    } catch (IOException expected) {
1225      // expected
1226    }
1227    // The WAL is hosed now. It has two edits appended. We cannot roll the log without it
1228    // throwing a DroppedSnapshotException to force an abort. Just clean up the mess.
1229    region.close(true);
1230    wal.close();
1231
1232    // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1233    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1234    wal = new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1235          method, walConf);
1236    wal.init();
1237    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
1238      HConstants.EMPTY_END_ROW, false, Durability.USE_DEFAULT, wal, family);
1239    region.put(put);
1240
1241    // 3. Test case where ABORT_FLUSH will throw exception.
1242    // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1243    // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
1244    wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1245
1246    try {
1247      region.flush(true);
1248      fail("This should have thrown exception");
1249    } catch (DroppedSnapshotException expected) {
1250      // we expect this exception, since we were able to write the snapshot, but failed to
1251      // write the flush marker to WAL
1252    } catch (IOException unexpected) {
1253      throw unexpected;
1254    }
1255  }
1256
1257  @Test
1258  public void testGetWhileRegionClose() throws IOException {
1259    Configuration hc = initSplit();
1260    int numRows = 100;
1261    byte[][] families = { fam1, fam2, fam3 };
1262
1263    // Setting up region
1264    this.region = initHRegion(tableName, method, hc, families);
1265    // Put data in region
1266    final int startRow = 100;
1267    putData(startRow, numRows, qual1, families);
1268    putData(startRow, numRows, qual2, families);
1269    putData(startRow, numRows, qual3, families);
1270    final AtomicBoolean done = new AtomicBoolean(false);
1271    final AtomicInteger gets = new AtomicInteger(0);
1272    GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1273    try {
1274      // Set ten threads running concurrently getting from the region.
1275      for (int i = 0; i < threads.length / 2; i++) {
1276        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1277        threads[i].setDaemon(true);
1278        threads[i].start();
1279      }
1280      // Artificially make the condition by setting closing flag explicitly.
1281      // I can't make the issue happen with a call to region.close().
1282      this.region.closing.set(true);
1283      for (int i = threads.length / 2; i < threads.length; i++) {
1284        threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1285        threads[i].setDaemon(true);
1286        threads[i].start();
1287      }
1288    } finally {
1289      if (this.region != null) {
1290        HBaseTestingUtility.closeRegionAndWAL(this.region);
1291        this.region = null;
1292      }
1293    }
1294    done.set(true);
1295    for (GetTillDoneOrException t : threads) {
1296      try {
1297        t.join();
1298      } catch (InterruptedException e) {
1299        e.printStackTrace();
1300      }
1301      if (t.e != null) {
1302        LOG.info("Exception=" + t.e);
1303        assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1304      }
1305    }
1306  }
1307
1308  /*
1309   * Thread that does get on single row until 'done' flag is flipped. If an
1310   * exception causes us to fail, it records it.
1311   */
1312  class GetTillDoneOrException extends Thread {
1313    private final Get g;
1314    private final AtomicBoolean done;
1315    private final AtomicInteger count;
1316    private Exception e;
1317
1318    GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d,
1319        final AtomicInteger c) {
1320      super("getter." + i);
1321      this.g = new Get(r);
1322      this.done = d;
1323      this.count = c;
1324    }
1325
1326    @Override
1327    public void run() {
1328      while (!this.done.get()) {
1329        try {
1330          assertTrue(region.get(g).size() > 0);
1331          this.count.incrementAndGet();
1332        } catch (Exception e) {
1333          this.e = e;
1334          break;
1335        }
1336      }
1337    }
1338  }
1339
1340  /*
1341   * An involved filter test. Has multiple column families and deletes in mix.
1342   */
1343  @Test
1344  public void testWeirdCacheBehaviour() throws Exception {
1345    final TableName tableName = TableName.valueOf(name.getMethodName());
1346    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1347        Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1348    this.region = initHRegion(tableName, method, CONF, FAMILIES);
1349    String value = "this is the value";
1350    String value2 = "this is some other value";
1351    String keyPrefix1 = "prefix1";
1352    String keyPrefix2 = "prefix2";
1353    String keyPrefix3 = "prefix3";
1354    putRows(this.region, 3, value, keyPrefix1);
1355    putRows(this.region, 3, value, keyPrefix2);
1356    putRows(this.region, 3, value, keyPrefix3);
1357    putRows(this.region, 3, value2, keyPrefix1);
1358    putRows(this.region, 3, value2, keyPrefix2);
1359    putRows(this.region, 3, value2, keyPrefix3);
1360    System.out.println("Checking values for key: " + keyPrefix1);
1361    assertEquals("Got back incorrect number of rows from scan", 3,
1362        getNumberOfRows(keyPrefix1, value2, this.region));
1363    System.out.println("Checking values for key: " + keyPrefix2);
1364    assertEquals("Got back incorrect number of rows from scan", 3,
1365        getNumberOfRows(keyPrefix2, value2, this.region));
1366    System.out.println("Checking values for key: " + keyPrefix3);
1367    assertEquals("Got back incorrect number of rows from scan", 3,
1368        getNumberOfRows(keyPrefix3, value2, this.region));
1369    deleteColumns(this.region, value2, keyPrefix1);
1370    deleteColumns(this.region, value2, keyPrefix2);
1371    deleteColumns(this.region, value2, keyPrefix3);
1372    System.out.println("Starting important checks.....");
1373    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1374        getNumberOfRows(keyPrefix1, value2, this.region));
1375    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1376        getNumberOfRows(keyPrefix2, value2, this.region));
1377    assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1378        getNumberOfRows(keyPrefix3, value2, this.region));
1379  }
1380
1381  @Test
1382  public void testAppendWithReadOnlyTable() throws Exception {
1383    final TableName tableName = TableName.valueOf(name.getMethodName());
1384    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1385    boolean exceptionCaught = false;
1386    Append append = new Append(Bytes.toBytes("somerow"));
1387    append.setDurability(Durability.SKIP_WAL);
1388    append.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1389        Bytes.toBytes("somevalue"));
1390    try {
1391      region.append(append);
1392    } catch (IOException e) {
1393      exceptionCaught = true;
1394    }
1395    assertTrue(exceptionCaught == true);
1396  }
1397
1398  @Test
1399  public void testIncrWithReadOnlyTable() throws Exception {
1400    final TableName tableName = TableName.valueOf(name.getMethodName());
1401    this.region = initHRegion(tableName, method, CONF, true, Bytes.toBytes("somefamily"));
1402    boolean exceptionCaught = false;
1403    Increment inc = new Increment(Bytes.toBytes("somerow"));
1404    inc.setDurability(Durability.SKIP_WAL);
1405    inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1406    try {
1407      region.increment(inc);
1408    } catch (IOException e) {
1409      exceptionCaught = true;
1410    }
1411    assertTrue(exceptionCaught == true);
1412  }
1413
1414  private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1415    InternalScanner scanner = buildScanner(keyPrefix, value, r);
1416    int count = 0;
1417    boolean more = false;
1418    List<Cell> results = new ArrayList<>();
1419    do {
1420      more = scanner.next(results);
1421      if (results != null && !results.isEmpty())
1422        count++;
1423      else
1424        break;
1425      Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1426      delete.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1427      r.delete(delete);
1428      results.clear();
1429    } while (more);
1430    assertEquals("Did not perform correct number of deletes", 3, count);
1431  }
1432
1433  private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1434    InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1435    int numberOfResults = 0;
1436    List<Cell> results = new ArrayList<>();
1437    boolean more = false;
1438    do {
1439      more = resultScanner.next(results);
1440      if (results != null && !results.isEmpty())
1441        numberOfResults++;
1442      else
1443        break;
1444      for (Cell kv : results) {
1445        System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1446      }
1447      results.clear();
1448    } while (more);
1449    return numberOfResults;
1450  }
1451
1452  private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1453      throws IOException {
1454    // Defaults FilterList.Operator.MUST_PASS_ALL.
1455    FilterList allFilters = new FilterList();
1456    allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1457    // Only return rows where this column value exists in the row.
1458    SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1459        Bytes.toBytes("qual2"), CompareOperator.EQUAL, Bytes.toBytes(value));
1460    filter.setFilterIfMissing(true);
1461    allFilters.addFilter(filter);
1462    Scan scan = new Scan();
1463    scan.addFamily(Bytes.toBytes("trans-blob"));
1464    scan.addFamily(Bytes.toBytes("trans-type"));
1465    scan.addFamily(Bytes.toBytes("trans-date"));
1466    scan.addFamily(Bytes.toBytes("trans-tags"));
1467    scan.addFamily(Bytes.toBytes("trans-group"));
1468    scan.setFilter(allFilters);
1469    return r.getScanner(scan);
1470  }
1471
1472  private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1473    for (int i = 0; i < numRows; i++) {
1474      String row = key + "_" + i/* UUID.randomUUID().toString() */;
1475      System.out.println(String.format("Saving row: %s, with value %s", row, value));
1476      Put put = new Put(Bytes.toBytes(row));
1477      put.setDurability(Durability.SKIP_WAL);
1478      put.addColumn(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1479      put.addColumn(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1480      put.addColumn(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1481      put.addColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1482      put.addColumn(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1483      r.put(put);
1484    }
1485  }
1486
1487  @Test
1488  public void testFamilyWithAndWithoutColon() throws Exception {
1489    byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1490    this.region = initHRegion(tableName, method, CONF, cf);
1491    Put p = new Put(tableName.toBytes());
1492    byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1493    p.addColumn(cfwithcolon, cfwithcolon, cfwithcolon);
1494    boolean exception = false;
1495    try {
1496      this.region.put(p);
1497    } catch (NoSuchColumnFamilyException e) {
1498      exception = true;
1499    }
1500    assertTrue(exception);
1501  }
1502
1503  @Test
1504  public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1505    final Put[] puts = new Put[10];
1506    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1507    long syncs = prepareRegionForBachPut(puts, source, false);
1508
1509    OperationStatus[] codes = this.region.batchMutate(puts);
1510    assertEquals(10, codes.length);
1511    for (int i = 0; i < 10; i++) {
1512      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1513    }
1514    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1515
1516    LOG.info("Next a batch put with one invalid family");
1517    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1518    codes = this.region.batchMutate(puts);
1519    assertEquals(10, codes.length);
1520    for (int i = 0; i < 10; i++) {
1521      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1522          codes[i].getOperationStatusCode());
1523    }
1524
1525    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1526  }
1527
1528  @Test
1529  public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1530    final Put[] puts = new Put[10];
1531    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1532    long syncs = prepareRegionForBachPut(puts, source, false);
1533
1534    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1535
1536    LOG.info("batchPut will have to break into four batches to avoid row locks");
1537    RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1538    RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_1"));
1539    RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_3"));
1540    RowLock rowLock4 = region.getRowLock(Bytes.toBytes("row_3"), true);
1541
1542    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1543    final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<>();
1544    final CountDownLatch startingPuts = new CountDownLatch(1);
1545    final CountDownLatch startingClose = new CountDownLatch(1);
1546    TestThread putter = new TestThread(ctx) {
1547      @Override
1548      public void doWork() throws IOException {
1549        startingPuts.countDown();
1550        retFromThread.set(region.batchMutate(puts));
1551      }
1552    };
1553    LOG.info("...starting put thread while holding locks");
1554    ctx.addThread(putter);
1555    ctx.startThreads();
1556
1557    // Now attempt to close the region from another thread.  Prior to HBASE-12565
1558    // this would cause the in-progress batchMutate operation to to fail with
1559    // exception because it use to release and re-acquire the close-guard lock
1560    // between batches.  Caller then didn't get status indicating which writes succeeded.
1561    // We now expect this thread to block until the batchMutate call finishes.
1562    Thread regionCloseThread = new TestThread(ctx) {
1563      @Override
1564      public void doWork() {
1565        try {
1566          startingPuts.await();
1567          // Give some time for the batch mutate to get in.
1568          // We don't want to race with the mutate
1569          Thread.sleep(10);
1570          startingClose.countDown();
1571          HBaseTestingUtility.closeRegionAndWAL(region);
1572          region = null;
1573        } catch (IOException e) {
1574          throw new RuntimeException(e);
1575        } catch (InterruptedException e) {
1576          throw new RuntimeException(e);
1577        }
1578      }
1579    };
1580    regionCloseThread.start();
1581
1582    startingClose.await();
1583    startingPuts.await();
1584    Thread.sleep(100);
1585    LOG.info("...releasing row lock 1, which should let put thread continue");
1586    rowLock1.release();
1587    rowLock2.release();
1588    rowLock3.release();
1589    waitForCounter(source, "syncTimeNumOps", syncs + 1);
1590
1591    LOG.info("...joining on put thread");
1592    ctx.stop();
1593    regionCloseThread.join();
1594
1595    OperationStatus[] codes = retFromThread.get();
1596    for (int i = 0; i < codes.length; i++) {
1597      assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1598          codes[i].getOperationStatusCode());
1599    }
1600    rowLock4.release();
1601  }
1602
1603  private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1604      throws InterruptedException {
1605    long startWait = System.currentTimeMillis();
1606    long currentCount;
1607    while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1608      Thread.sleep(100);
1609      if (System.currentTimeMillis() - startWait > 10000) {
1610        fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1611            expectedCount, currentCount));
1612      }
1613    }
1614  }
1615
1616  @Test
1617  public void testAtomicBatchPut() throws IOException {
1618    final Put[] puts = new Put[10];
1619    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1620    long syncs = prepareRegionForBachPut(puts, source, false);
1621
1622    // 1. Straight forward case, should succeed
1623    MutationBatchOperation batchOp = new MutationBatchOperation(region, puts, true,
1624        HConstants.NO_NONCE, HConstants.NO_NONCE);
1625    OperationStatus[] codes = this.region.batchMutate(batchOp);
1626    assertEquals(10, codes.length);
1627    for (int i = 0; i < 10; i++) {
1628      assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1629    }
1630    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1631
1632    // 2. Failed to get lock
1633    RowLock lock = region.getRowLock(Bytes.toBytes("row_" + 3));
1634    // Method {@link HRegion#getRowLock(byte[])} is reentrant. As 'row_3' is locked in this
1635    // thread, need to run {@link HRegion#batchMutate(HRegion.BatchOperation)} in different thread
1636    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1637    final AtomicReference<IOException> retFromThread = new AtomicReference<>();
1638    final CountDownLatch finishedPuts = new CountDownLatch(1);
1639    final MutationBatchOperation finalBatchOp = new MutationBatchOperation(region, puts, true,
1640        HConstants
1641        .NO_NONCE,
1642        HConstants.NO_NONCE);
1643    TestThread putter = new TestThread(ctx) {
1644      @Override
1645      public void doWork() throws IOException {
1646        try {
1647          region.batchMutate(finalBatchOp);
1648        } catch (IOException ioe) {
1649          LOG.error("test failed!", ioe);
1650          retFromThread.set(ioe);
1651        }
1652        finishedPuts.countDown();
1653      }
1654    };
1655    LOG.info("...starting put thread while holding locks");
1656    ctx.addThread(putter);
1657    ctx.startThreads();
1658    LOG.info("...waiting for batch puts while holding locks");
1659    try {
1660      finishedPuts.await();
1661    } catch (InterruptedException e) {
1662      LOG.error("Interrupted!", e);
1663    } finally {
1664      if (lock != null) {
1665        lock.release();
1666      }
1667    }
1668    assertNotNull(retFromThread.get());
1669    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1670
1671    // 3. Exception thrown in validation
1672    LOG.info("Next a batch put with one invalid family");
1673    puts[5].addColumn(Bytes.toBytes("BAD_CF"), qual, value);
1674    batchOp = new MutationBatchOperation(region, puts, true, HConstants.NO_NONCE,
1675        HConstants.NO_NONCE);
1676    thrown.expect(NoSuchColumnFamilyException.class);
1677    this.region.batchMutate(batchOp);
1678  }
1679
1680  @Test
1681  public void testBatchPutWithTsSlop() throws Exception {
1682    // add data with a timestamp that is too recent for range. Ensure assert
1683    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1684    final Put[] puts = new Put[10];
1685    MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1686
1687    long syncs = prepareRegionForBachPut(puts, source, true);
1688
1689    OperationStatus[] codes = this.region.batchMutate(puts);
1690    assertEquals(10, codes.length);
1691    for (int i = 0; i < 10; i++) {
1692      assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1693    }
1694    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1695  }
1696
1697  /**
1698   * @return syncs initial syncTimeNumOps
1699   */
1700  private long prepareRegionForBachPut(final Put[] puts, final MetricsWALSource source,
1701      boolean slop) throws IOException {
1702    this.region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
1703
1704    LOG.info("First a batch put with all valid puts");
1705    for (int i = 0; i < puts.length; i++) {
1706      puts[i] = slop ? new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100) :
1707          new Put(Bytes.toBytes("row_" + i));
1708      puts[i].addColumn(COLUMN_FAMILY_BYTES, qual, value);
1709    }
1710
1711    long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1712    metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1713    return syncs;
1714  }
1715
1716  // ////////////////////////////////////////////////////////////////////////////
1717  // checkAndMutate tests
1718  // ////////////////////////////////////////////////////////////////////////////
1719  @Test
1720  public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1721    byte[] row1 = Bytes.toBytes("row1");
1722    byte[] fam1 = Bytes.toBytes("fam1");
1723    byte[] qf1 = Bytes.toBytes("qualifier");
1724    byte[] emptyVal = new byte[] {};
1725    byte[] val1 = Bytes.toBytes("value1");
1726    byte[] val2 = Bytes.toBytes("value2");
1727
1728    // Setting up region
1729    this.region = initHRegion(tableName, method, CONF, fam1);
1730    // Putting empty data in key
1731    Put put = new Put(row1);
1732    put.addColumn(fam1, qf1, emptyVal);
1733
1734    // checkAndPut with empty value
1735    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1736        new BinaryComparator(emptyVal), put);
1737    assertTrue(res);
1738
1739    // Putting data in key
1740    put = new Put(row1);
1741    put.addColumn(fam1, qf1, val1);
1742
1743    // checkAndPut with correct value
1744    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1745        new BinaryComparator(emptyVal), put);
1746    assertTrue(res);
1747
1748    // not empty anymore
1749    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1750        new BinaryComparator(emptyVal), put);
1751    assertFalse(res);
1752
1753    Delete delete = new Delete(row1);
1754    delete.addColumn(fam1, qf1);
1755    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1756        new BinaryComparator(emptyVal), delete);
1757    assertFalse(res);
1758
1759    put = new Put(row1);
1760    put.addColumn(fam1, qf1, val2);
1761    // checkAndPut with correct value
1762    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1763        new BinaryComparator(val1), put);
1764    assertTrue(res);
1765
1766    // checkAndDelete with correct value
1767    delete = new Delete(row1);
1768    delete.addColumn(fam1, qf1);
1769    delete.addColumn(fam1, qf1);
1770    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1771        new BinaryComparator(val2), delete);
1772    assertTrue(res);
1773
1774    delete = new Delete(row1);
1775    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1776        new BinaryComparator(emptyVal), delete);
1777    assertTrue(res);
1778
1779    // checkAndPut looking for a null value
1780    put = new Put(row1);
1781    put.addColumn(fam1, qf1, val1);
1782
1783    res = region
1784        .checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new NullComparator(), put);
1785    assertTrue(res);
1786  }
1787
1788  @Test
1789  public void testCheckAndMutate_WithWrongValue() throws IOException {
1790    byte[] row1 = Bytes.toBytes("row1");
1791    byte[] fam1 = Bytes.toBytes("fam1");
1792    byte[] qf1 = Bytes.toBytes("qualifier");
1793    byte[] val1 = Bytes.toBytes("value1");
1794    byte[] val2 = Bytes.toBytes("value2");
1795    BigDecimal bd1 = new BigDecimal(Double.MAX_VALUE);
1796    BigDecimal bd2 = new BigDecimal(Double.MIN_VALUE);
1797
1798    // Setting up region
1799    this.region = initHRegion(tableName, method, CONF, fam1);
1800    // Putting data in key
1801    Put put = new Put(row1);
1802    put.addColumn(fam1, qf1, val1);
1803    region.put(put);
1804
1805    // checkAndPut with wrong value
1806    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1807        new BinaryComparator(val2), put);
1808    assertEquals(false, res);
1809
1810    // checkAndDelete with wrong value
1811    Delete delete = new Delete(row1);
1812    delete.addFamily(fam1);
1813    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1814        new BinaryComparator(val2), put);
1815    assertEquals(false, res);
1816
1817    // Putting data in key
1818    put = new Put(row1);
1819    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1820    region.put(put);
1821
1822    // checkAndPut with wrong value
1823    res =
1824        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1825            new BigDecimalComparator(bd2), put);
1826    assertEquals(false, res);
1827
1828    // checkAndDelete with wrong value
1829    delete = new Delete(row1);
1830    delete.addFamily(fam1);
1831    res =
1832        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1833            new BigDecimalComparator(bd2), put);
1834    assertEquals(false, res);
1835  }
1836
1837  @Test
1838  public void testCheckAndMutate_WithCorrectValue() throws IOException {
1839    byte[] row1 = Bytes.toBytes("row1");
1840    byte[] fam1 = Bytes.toBytes("fam1");
1841    byte[] qf1 = Bytes.toBytes("qualifier");
1842    byte[] val1 = Bytes.toBytes("value1");
1843    BigDecimal bd1 = new BigDecimal(Double.MIN_VALUE);
1844
1845    // Setting up region
1846    this.region = initHRegion(tableName, method, CONF, fam1);
1847    // Putting data in key
1848    Put put = new Put(row1);
1849    put.addColumn(fam1, qf1, val1);
1850    region.put(put);
1851
1852    // checkAndPut with correct value
1853    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1854        new BinaryComparator(val1), put);
1855    assertEquals(true, res);
1856
1857    // checkAndDelete with correct value
1858    Delete delete = new Delete(row1);
1859    delete.addColumn(fam1, qf1);
1860    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
1861        delete);
1862    assertEquals(true, res);
1863
1864    // Putting data in key
1865    put = new Put(row1);
1866    put.addColumn(fam1, qf1, Bytes.toBytes(bd1));
1867    region.put(put);
1868
1869    // checkAndPut with correct value
1870    res =
1871        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1872            bd1), put);
1873    assertEquals(true, res);
1874
1875    // checkAndDelete with correct value
1876    delete = new Delete(row1);
1877    delete.addColumn(fam1, qf1);
1878    res =
1879        region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BigDecimalComparator(
1880            bd1), delete);
1881    assertEquals(true, res);
1882  }
1883
1884  @Test
1885  public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1886    byte[] row1 = Bytes.toBytes("row1");
1887    byte[] fam1 = Bytes.toBytes("fam1");
1888    byte[] qf1 = Bytes.toBytes("qualifier");
1889    byte[] val1 = Bytes.toBytes("value1");
1890    byte[] val2 = Bytes.toBytes("value2");
1891    byte[] val3 = Bytes.toBytes("value3");
1892    byte[] val4 = Bytes.toBytes("value4");
1893
1894    // Setting up region
1895    this.region = initHRegion(tableName, method, CONF, fam1);
1896    // Putting val3 in key
1897    Put put = new Put(row1);
1898    put.addColumn(fam1, qf1, val3);
1899    region.put(put);
1900
1901    // Test CompareOp.LESS: original = val3, compare with val3, fail
1902    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1903        new BinaryComparator(val3), put);
1904    assertEquals(false, res);
1905
1906    // Test CompareOp.LESS: original = val3, compare with val4, fail
1907    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1908        new BinaryComparator(val4), put);
1909    assertEquals(false, res);
1910
1911    // Test CompareOp.LESS: original = val3, compare with val2,
1912    // succeed (now value = val2)
1913    put = new Put(row1);
1914    put.addColumn(fam1, qf1, val2);
1915    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS,
1916        new BinaryComparator(val2), put);
1917    assertEquals(true, res);
1918
1919    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1920    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1921        new BinaryComparator(val3), put);
1922    assertEquals(false, res);
1923
1924    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1925    // succeed (value still = val2)
1926    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1927        new BinaryComparator(val2), put);
1928    assertEquals(true, res);
1929
1930    // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1931    // succeed (now value = val3)
1932    put = new Put(row1);
1933    put.addColumn(fam1, qf1, val3);
1934    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.LESS_OR_EQUAL,
1935        new BinaryComparator(val1), put);
1936    assertEquals(true, res);
1937
1938    // Test CompareOp.GREATER: original = val3, compare with val3, fail
1939    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1940        new BinaryComparator(val3), put);
1941    assertEquals(false, res);
1942
1943    // Test CompareOp.GREATER: original = val3, compare with val2, fail
1944    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1945        new BinaryComparator(val2), put);
1946    assertEquals(false, res);
1947
1948    // Test CompareOp.GREATER: original = val3, compare with val4,
1949    // succeed (now value = val2)
1950    put = new Put(row1);
1951    put.addColumn(fam1, qf1, val2);
1952    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER,
1953        new BinaryComparator(val4), put);
1954    assertEquals(true, res);
1955
1956    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
1957    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
1958        new BinaryComparator(val1), put);
1959    assertEquals(false, res);
1960
1961    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
1962    // succeed (value still = val2)
1963    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
1964        new BinaryComparator(val2), put);
1965    assertEquals(true, res);
1966
1967    // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
1968    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.GREATER_OR_EQUAL,
1969        new BinaryComparator(val3), put);
1970    assertEquals(true, res);
1971  }
1972
1973  @Test
1974  public void testCheckAndPut_ThatPutWasWritten() throws IOException {
1975    byte[] row1 = Bytes.toBytes("row1");
1976    byte[] fam1 = Bytes.toBytes("fam1");
1977    byte[] fam2 = Bytes.toBytes("fam2");
1978    byte[] qf1 = Bytes.toBytes("qualifier");
1979    byte[] val1 = Bytes.toBytes("value1");
1980    byte[] val2 = Bytes.toBytes("value2");
1981
1982    byte[][] families = { fam1, fam2 };
1983
1984    // Setting up region
1985    this.region = initHRegion(tableName, method, CONF, families);
1986    // Putting data in the key to check
1987    Put put = new Put(row1);
1988    put.addColumn(fam1, qf1, val1);
1989    region.put(put);
1990
1991    // Creating put to add
1992    long ts = System.currentTimeMillis();
1993    KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
1994    put = new Put(row1);
1995    put.add(kv);
1996
1997    // checkAndPut with wrong value
1998    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
1999        new BinaryComparator(val1), put);
2000    assertEquals(true, res);
2001
2002    Get get = new Get(row1);
2003    get.addColumn(fam2, qf1);
2004    Cell[] actual = region.get(get).rawCells();
2005
2006    Cell[] expected = { kv };
2007
2008    assertEquals(expected.length, actual.length);
2009    for (int i = 0; i < actual.length; i++) {
2010      assertEquals(expected[i], actual[i]);
2011    }
2012  }
2013
2014  @Test
2015  public void testCheckAndPut_wrongRowInPut() throws IOException {
2016    this.region = initHRegion(tableName, method, CONF, COLUMNS);
2017    Put put = new Put(row2);
2018    put.addColumn(fam1, qual1, value1);
2019    try {
2020      region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL,
2021          new BinaryComparator(value2), put);
2022      fail();
2023    } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
2024      // expected exception.
2025    }
2026  }
2027
2028  @Test
2029  public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
2030    byte[] row1 = Bytes.toBytes("row1");
2031    byte[] fam1 = Bytes.toBytes("fam1");
2032    byte[] fam2 = Bytes.toBytes("fam2");
2033    byte[] qf1 = Bytes.toBytes("qualifier1");
2034    byte[] qf2 = Bytes.toBytes("qualifier2");
2035    byte[] qf3 = Bytes.toBytes("qualifier3");
2036    byte[] val1 = Bytes.toBytes("value1");
2037    byte[] val2 = Bytes.toBytes("value2");
2038    byte[] val3 = Bytes.toBytes("value3");
2039    byte[] emptyVal = new byte[] {};
2040
2041    byte[][] families = { fam1, fam2 };
2042
2043    // Setting up region
2044    this.region = initHRegion(tableName, method, CONF, families);
2045    // Put content
2046    Put put = new Put(row1);
2047    put.addColumn(fam1, qf1, val1);
2048    region.put(put);
2049    Threads.sleep(2);
2050
2051    put = new Put(row1);
2052    put.addColumn(fam1, qf1, val2);
2053    put.addColumn(fam2, qf1, val3);
2054    put.addColumn(fam2, qf2, val2);
2055    put.addColumn(fam2, qf3, val1);
2056    put.addColumn(fam1, qf3, val1);
2057    region.put(put);
2058
2059    // Multi-column delete
2060    Delete delete = new Delete(row1);
2061    delete.addColumn(fam1, qf1);
2062    delete.addColumn(fam2, qf1);
2063    delete.addColumn(fam1, qf3);
2064    boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL,
2065        new BinaryComparator(val2), delete);
2066    assertEquals(true, res);
2067
2068    Get get = new Get(row1);
2069    get.addColumn(fam1, qf1);
2070    get.addColumn(fam1, qf3);
2071    get.addColumn(fam2, qf2);
2072    Result r = region.get(get);
2073    assertEquals(2, r.size());
2074    assertArrayEquals(val1, r.getValue(fam1, qf1));
2075    assertArrayEquals(val2, r.getValue(fam2, qf2));
2076
2077    // Family delete
2078    delete = new Delete(row1);
2079    delete.addFamily(fam2);
2080    res = region.checkAndMutate(row1, fam2, qf1, CompareOperator.EQUAL,
2081        new BinaryComparator(emptyVal), delete);
2082    assertEquals(true, res);
2083
2084    get = new Get(row1);
2085    r = region.get(get);
2086    assertEquals(1, r.size());
2087    assertArrayEquals(val1, r.getValue(fam1, qf1));
2088
2089    // Row delete
2090    delete = new Delete(row1);
2091    res = region.checkAndMutate(row1, fam1, qf1, CompareOperator.EQUAL, new BinaryComparator(val1),
2092        delete);
2093    assertEquals(true, res);
2094    get = new Get(row1);
2095    r = region.get(get);
2096    assertEquals(0, r.size());
2097  }
2098
2099  // ////////////////////////////////////////////////////////////////////////////
2100  // Delete tests
2101  // ////////////////////////////////////////////////////////////////////////////
2102  @Test
2103  public void testDelete_multiDeleteColumn() throws IOException {
2104    byte[] row1 = Bytes.toBytes("row1");
2105    byte[] fam1 = Bytes.toBytes("fam1");
2106    byte[] qual = Bytes.toBytes("qualifier");
2107    byte[] value = Bytes.toBytes("value");
2108
2109    Put put = new Put(row1);
2110    put.addColumn(fam1, qual, 1, value);
2111    put.addColumn(fam1, qual, 2, value);
2112
2113    this.region = initHRegion(tableName, method, CONF, fam1);
2114    region.put(put);
2115
2116    // We do support deleting more than 1 'latest' version
2117    Delete delete = new Delete(row1);
2118    delete.addColumn(fam1, qual);
2119    delete.addColumn(fam1, qual);
2120    region.delete(delete);
2121
2122    Get get = new Get(row1);
2123    get.addFamily(fam1);
2124    Result r = region.get(get);
2125    assertEquals(0, r.size());
2126  }
2127
2128  @Test
2129  public void testDelete_CheckFamily() throws IOException {
2130    byte[] row1 = Bytes.toBytes("row1");
2131    byte[] fam1 = Bytes.toBytes("fam1");
2132    byte[] fam2 = Bytes.toBytes("fam2");
2133    byte[] fam3 = Bytes.toBytes("fam3");
2134    byte[] fam4 = Bytes.toBytes("fam4");
2135
2136    // Setting up region
2137    this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2138    List<Cell> kvs = new ArrayList<>();
2139    kvs.add(new KeyValue(row1, fam4, null, null));
2140
2141    // testing existing family
2142    byte[] family = fam2;
2143    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2144    deleteMap.put(family, kvs);
2145    region.delete(deleteMap, Durability.SYNC_WAL);
2146
2147    // testing non existing family
2148    boolean ok = false;
2149    family = fam4;
2150    try {
2151      deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2152      deleteMap.put(family, kvs);
2153      region.delete(deleteMap, Durability.SYNC_WAL);
2154    } catch (Exception e) {
2155      ok = true;
2156    }
2157    assertTrue("Family " + new String(family, StandardCharsets.UTF_8) + " does exist", ok);
2158  }
2159
2160  @Test
2161  public void testDelete_mixed() throws IOException, InterruptedException {
2162    byte[] fam = Bytes.toBytes("info");
2163    byte[][] families = { fam };
2164    this.region = initHRegion(tableName, method, CONF, families);
2165    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2166
2167    byte[] row = Bytes.toBytes("table_name");
2168    // column names
2169    byte[] serverinfo = Bytes.toBytes("serverinfo");
2170    byte[] splitA = Bytes.toBytes("splitA");
2171    byte[] splitB = Bytes.toBytes("splitB");
2172
2173    // add some data:
2174    Put put = new Put(row);
2175    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2176    region.put(put);
2177
2178    put = new Put(row);
2179    put.addColumn(fam, splitB, Bytes.toBytes("reference_B"));
2180    region.put(put);
2181
2182    put = new Put(row);
2183    put.addColumn(fam, serverinfo, Bytes.toBytes("ip_address"));
2184    region.put(put);
2185
2186    // ok now delete a split:
2187    Delete delete = new Delete(row);
2188    delete.addColumns(fam, splitA);
2189    region.delete(delete);
2190
2191    // assert some things:
2192    Get get = new Get(row).addColumn(fam, serverinfo);
2193    Result result = region.get(get);
2194    assertEquals(1, result.size());
2195
2196    get = new Get(row).addColumn(fam, splitA);
2197    result = region.get(get);
2198    assertEquals(0, result.size());
2199
2200    get = new Get(row).addColumn(fam, splitB);
2201    result = region.get(get);
2202    assertEquals(1, result.size());
2203
2204    // Assert that after a delete, I can put.
2205    put = new Put(row);
2206    put.addColumn(fam, splitA, Bytes.toBytes("reference_A"));
2207    region.put(put);
2208    get = new Get(row);
2209    result = region.get(get);
2210    assertEquals(3, result.size());
2211
2212    // Now delete all... then test I can add stuff back
2213    delete = new Delete(row);
2214    region.delete(delete);
2215    assertEquals(0, region.get(get).size());
2216
2217    region.put(new Put(row).addColumn(fam, splitA, Bytes.toBytes("reference_A")));
2218    result = region.get(get);
2219    assertEquals(1, result.size());
2220  }
2221
2222  @Test
2223  public void testDeleteRowWithFutureTs() throws IOException {
2224    byte[] fam = Bytes.toBytes("info");
2225    byte[][] families = { fam };
2226    this.region = initHRegion(tableName, method, CONF, families);
2227    byte[] row = Bytes.toBytes("table_name");
2228    // column names
2229    byte[] serverinfo = Bytes.toBytes("serverinfo");
2230
2231    // add data in the far future
2232    Put put = new Put(row);
2233    put.addColumn(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2234    region.put(put);
2235
2236    // now delete something in the present
2237    Delete delete = new Delete(row);
2238    region.delete(delete);
2239
2240    // make sure we still see our data
2241    Get get = new Get(row).addColumn(fam, serverinfo);
2242    Result result = region.get(get);
2243    assertEquals(1, result.size());
2244
2245    // delete the future row
2246    delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2247    region.delete(delete);
2248
2249    // make sure it is gone
2250    get = new Get(row).addColumn(fam, serverinfo);
2251    result = region.get(get);
2252    assertEquals(0, result.size());
2253  }
2254
2255  /**
2256   * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2257   * the actual timestamp
2258   */
2259  @Test
2260  public void testPutWithLatestTS() throws IOException {
2261    byte[] fam = Bytes.toBytes("info");
2262    byte[][] families = { fam };
2263    this.region = initHRegion(tableName, method, CONF, families);
2264    byte[] row = Bytes.toBytes("row1");
2265    // column names
2266    byte[] qual = Bytes.toBytes("qual");
2267
2268    // add data with LATEST_TIMESTAMP, put without WAL
2269    Put put = new Put(row);
2270    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2271    region.put(put);
2272
2273    // Make sure it shows up with an actual timestamp
2274    Get get = new Get(row).addColumn(fam, qual);
2275    Result result = region.get(get);
2276    assertEquals(1, result.size());
2277    Cell kv = result.rawCells()[0];
2278    LOG.info("Got: " + kv);
2279    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2280        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2281
2282    // Check same with WAL enabled (historically these took different
2283    // code paths, so check both)
2284    row = Bytes.toBytes("row2");
2285    put = new Put(row);
2286    put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2287    region.put(put);
2288
2289    // Make sure it shows up with an actual timestamp
2290    get = new Get(row).addColumn(fam, qual);
2291    result = region.get(get);
2292    assertEquals(1, result.size());
2293    kv = result.rawCells()[0];
2294    LOG.info("Got: " + kv);
2295    assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2296        kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2297  }
2298
2299  /**
2300   * Tests that there is server-side filtering for invalid timestamp upper
2301   * bound. Note that the timestamp lower bound is automatically handled for us
2302   * by the TTL field.
2303   */
2304  @Test
2305  public void testPutWithTsSlop() throws IOException {
2306    byte[] fam = Bytes.toBytes("info");
2307    byte[][] families = { fam };
2308
2309    // add data with a timestamp that is too recent for range. Ensure assert
2310    CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2311    this.region = initHRegion(tableName, method, CONF, families);
2312    boolean caughtExcep = false;
2313    try {
2314      // no TS specified == use latest. should not error
2315      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2316      // TS out of range. should error
2317      region.put(new Put(row).addColumn(fam, Bytes.toBytes("qual"),
2318          System.currentTimeMillis() + 2000, Bytes.toBytes("value")));
2319      fail("Expected IOE for TS out of configured timerange");
2320    } catch (FailedSanityCheckException ioe) {
2321      LOG.debug("Received expected exception", ioe);
2322      caughtExcep = true;
2323    }
2324    assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2325  }
2326
2327  @Test
2328  public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2329    byte[] fam1 = Bytes.toBytes("columnA");
2330    byte[] fam2 = Bytes.toBytes("columnB");
2331    this.region = initHRegion(tableName, method, CONF, fam1, fam2);
2332    byte[] rowA = Bytes.toBytes("rowA");
2333    byte[] rowB = Bytes.toBytes("rowB");
2334
2335    byte[] value = Bytes.toBytes("value");
2336
2337    Delete delete = new Delete(rowA);
2338    delete.addFamily(fam1);
2339
2340    region.delete(delete);
2341
2342    // now create data.
2343    Put put = new Put(rowA);
2344    put.addColumn(fam2, null, value);
2345    region.put(put);
2346
2347    put = new Put(rowB);
2348    put.addColumn(fam1, null, value);
2349    put.addColumn(fam2, null, value);
2350    region.put(put);
2351
2352    Scan scan = new Scan();
2353    scan.addFamily(fam1).addFamily(fam2);
2354    InternalScanner s = region.getScanner(scan);
2355    List<Cell> results = new ArrayList<>();
2356    s.next(results);
2357    assertTrue(CellUtil.matchingRows(results.get(0), rowA));
2358
2359    results.clear();
2360    s.next(results);
2361    assertTrue(CellUtil.matchingRows(results.get(0), rowB));
2362  }
2363
2364  @Test
2365  public void testDataInMemoryWithoutWAL() throws IOException {
2366    FileSystem fs = FileSystem.get(CONF);
2367    Path rootDir = new Path(dir + "testDataInMemoryWithoutWAL");
2368    FSHLog hLog = new FSHLog(fs, rootDir, "testDataInMemoryWithoutWAL", CONF);
2369    hLog.init();
2370    // This chunk creation is done throughout the code base. Do we want to move it into core?
2371    // It is missing from this test. W/o it we NPE.
2372    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2373    region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
2374        COLUMN_FAMILY_BYTES);
2375
2376    Cell originalCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2377      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), value1);
2378    final long originalSize = originalCell.getSerializedSize();
2379
2380    Cell addCell = CellUtil.createCell(row, COLUMN_FAMILY_BYTES, qual1,
2381      System.currentTimeMillis(), KeyValue.Type.Put.getCode(), Bytes.toBytes("xxxxxxxxxx"));
2382    final long addSize = addCell.getSerializedSize();
2383
2384    LOG.info("originalSize:" + originalSize
2385      + ", addSize:" + addSize);
2386    // start test. We expect that the addPut's durability will be replaced
2387    // by originalPut's durability.
2388
2389    // case 1:
2390    testDataInMemoryWithoutWAL(region,
2391            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2392            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2393            originalSize + addSize);
2394
2395    // case 2:
2396    testDataInMemoryWithoutWAL(region,
2397            new Put(row).add(originalCell).setDurability(Durability.SKIP_WAL),
2398            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2399            originalSize + addSize);
2400
2401    // case 3:
2402    testDataInMemoryWithoutWAL(region,
2403            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2404            new Put(row).add(addCell).setDurability(Durability.SKIP_WAL),
2405            0);
2406
2407    // case 4:
2408    testDataInMemoryWithoutWAL(region,
2409            new Put(row).add(originalCell).setDurability(Durability.SYNC_WAL),
2410            new Put(row).add(addCell).setDurability(Durability.SYNC_WAL),
2411            0);
2412  }
2413
2414  private static void testDataInMemoryWithoutWAL(HRegion region, Put originalPut,
2415          final Put addPut, long delta) throws IOException {
2416    final long initSize = region.getDataInMemoryWithoutWAL();
2417    // save normalCPHost and replaced by mockedCPHost
2418    RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
2419    RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
2420    // Because the preBatchMutate returns void, we can't do usual Mockito when...then form. Must
2421    // do below format (from Mockito doc).
2422    Mockito.doAnswer(new Answer<Void>() {
2423      @Override
2424      public Void answer(InvocationOnMock invocation) throws Throwable {
2425        MiniBatchOperationInProgress<Mutation> mb = invocation.getArgument(0);
2426        mb.addOperationsFromCP(0, new Mutation[]{addPut});
2427        return null;
2428      }
2429    }).when(mockedCPHost).preBatchMutate(Mockito.isA(MiniBatchOperationInProgress.class));
2430    region.setCoprocessorHost(mockedCPHost);
2431    region.put(originalPut);
2432    region.setCoprocessorHost(normalCPHost);
2433    final long finalSize = region.getDataInMemoryWithoutWAL();
2434    assertEquals("finalSize:" + finalSize + ", initSize:"
2435      + initSize + ", delta:" + delta,finalSize, initSize + delta);
2436  }
2437
2438  @Test
2439  public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2440    Delete delete = new Delete(row);
2441    delete.addColumns(fam1, qual1);
2442    doTestDelete_AndPostInsert(delete);
2443  }
2444
2445  @Test
2446  public void testaddFamily_PostInsert() throws IOException, InterruptedException {
2447    Delete delete = new Delete(row);
2448    delete.addFamily(fam1);
2449    doTestDelete_AndPostInsert(delete);
2450  }
2451
2452  public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2453    this.region = initHRegion(tableName, method, CONF, fam1);
2454    EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2455    Put put = new Put(row);
2456    put.addColumn(fam1, qual1, value1);
2457    region.put(put);
2458
2459    // now delete the value:
2460    region.delete(delete);
2461
2462    // ok put data:
2463    put = new Put(row);
2464    put.addColumn(fam1, qual1, value2);
2465    region.put(put);
2466
2467    // ok get:
2468    Get get = new Get(row);
2469    get.addColumn(fam1, qual1);
2470
2471    Result r = region.get(get);
2472    assertEquals(1, r.size());
2473    assertArrayEquals(value2, r.getValue(fam1, qual1));
2474
2475    // next:
2476    Scan scan = new Scan(row);
2477    scan.addColumn(fam1, qual1);
2478    InternalScanner s = region.getScanner(scan);
2479
2480    List<Cell> results = new ArrayList<>();
2481    assertEquals(false, s.next(results));
2482    assertEquals(1, results.size());
2483    Cell kv = results.get(0);
2484
2485    assertArrayEquals(value2, CellUtil.cloneValue(kv));
2486    assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2487    assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2488    assertArrayEquals(row, CellUtil.cloneRow(kv));
2489  }
2490
2491  @Test
2492  public void testDelete_CheckTimestampUpdated() throws IOException {
2493    byte[] row1 = Bytes.toBytes("row1");
2494    byte[] col1 = Bytes.toBytes("col1");
2495    byte[] col2 = Bytes.toBytes("col2");
2496    byte[] col3 = Bytes.toBytes("col3");
2497
2498    // Setting up region
2499    this.region = initHRegion(tableName, method, CONF, fam1);
2500    // Building checkerList
2501    List<Cell> kvs = new ArrayList<>();
2502    kvs.add(new KeyValue(row1, fam1, col1, null));
2503    kvs.add(new KeyValue(row1, fam1, col2, null));
2504    kvs.add(new KeyValue(row1, fam1, col3, null));
2505
2506    NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
2507    deleteMap.put(fam1, kvs);
2508    region.delete(deleteMap, Durability.SYNC_WAL);
2509
2510    // extract the key values out the memstore:
2511    // This is kinda hacky, but better than nothing...
2512    long now = System.currentTimeMillis();
2513    AbstractMemStore memstore = (AbstractMemStore)region.getStore(fam1).memstore;
2514    Cell firstCell = memstore.getActive().first();
2515    assertTrue(firstCell.getTimestamp() <= now);
2516    now = firstCell.getTimestamp();
2517    for (Cell cell : memstore.getActive().getCellSet()) {
2518      assertTrue(cell.getTimestamp() <= now);
2519      now = cell.getTimestamp();
2520    }
2521  }
2522
2523  // ////////////////////////////////////////////////////////////////////////////
2524  // Get tests
2525  // ////////////////////////////////////////////////////////////////////////////
2526  @Test
2527  public void testGet_FamilyChecker() throws IOException {
2528    byte[] row1 = Bytes.toBytes("row1");
2529    byte[] fam1 = Bytes.toBytes("fam1");
2530    byte[] fam2 = Bytes.toBytes("False");
2531    byte[] col1 = Bytes.toBytes("col1");
2532
2533    // Setting up region
2534    this.region = initHRegion(tableName, method, CONF, fam1);
2535    Get get = new Get(row1);
2536    get.addColumn(fam2, col1);
2537
2538    // Test
2539    try {
2540      region.get(get);
2541      fail("Expecting DoNotRetryIOException in get but did not get any");
2542    } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2543      LOG.info("Got expected DoNotRetryIOException successfully");
2544    }
2545  }
2546
2547  @Test
2548  public void testGet_Basic() throws IOException {
2549    byte[] row1 = Bytes.toBytes("row1");
2550    byte[] fam1 = Bytes.toBytes("fam1");
2551    byte[] col1 = Bytes.toBytes("col1");
2552    byte[] col2 = Bytes.toBytes("col2");
2553    byte[] col3 = Bytes.toBytes("col3");
2554    byte[] col4 = Bytes.toBytes("col4");
2555    byte[] col5 = Bytes.toBytes("col5");
2556
2557    // Setting up region
2558    this.region = initHRegion(tableName, method, CONF, fam1);
2559    // Add to memstore
2560    Put put = new Put(row1);
2561    put.addColumn(fam1, col1, null);
2562    put.addColumn(fam1, col2, null);
2563    put.addColumn(fam1, col3, null);
2564    put.addColumn(fam1, col4, null);
2565    put.addColumn(fam1, col5, null);
2566    region.put(put);
2567
2568    Get get = new Get(row1);
2569    get.addColumn(fam1, col2);
2570    get.addColumn(fam1, col4);
2571    // Expected result
2572    KeyValue kv1 = new KeyValue(row1, fam1, col2);
2573    KeyValue kv2 = new KeyValue(row1, fam1, col4);
2574    KeyValue[] expected = { kv1, kv2 };
2575
2576    // Test
2577    Result res = region.get(get);
2578    assertEquals(expected.length, res.size());
2579    for (int i = 0; i < res.size(); i++) {
2580      assertTrue(CellUtil.matchingRows(expected[i], res.rawCells()[i]));
2581      assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2582      assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2583    }
2584
2585    // Test using a filter on a Get
2586    Get g = new Get(row1);
2587    final int count = 2;
2588    g.setFilter(new ColumnCountGetFilter(count));
2589    res = region.get(g);
2590    assertEquals(count, res.size());
2591  }
2592
2593  @Test
2594  public void testGet_Empty() throws IOException {
2595    byte[] row = Bytes.toBytes("row");
2596    byte[] fam = Bytes.toBytes("fam");
2597
2598    this.region = initHRegion(tableName, method, CONF, fam);
2599    Get get = new Get(row);
2600    get.addFamily(fam);
2601    Result r = region.get(get);
2602
2603    assertTrue(r.isEmpty());
2604  }
2605
2606  @Test
2607  public void testGetWithFilter() throws IOException, InterruptedException {
2608    byte[] row1 = Bytes.toBytes("row1");
2609    byte[] fam1 = Bytes.toBytes("fam1");
2610    byte[] col1 = Bytes.toBytes("col1");
2611    byte[] value1 = Bytes.toBytes("value1");
2612    byte[] value2 = Bytes.toBytes("value2");
2613
2614    final int maxVersions = 3;
2615    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
2616    hcd.setMaxVersions(maxVersions);
2617    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
2618    htd.addFamily(hcd);
2619    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
2620    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
2621    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
2622    final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
2623    this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
2624
2625    // Put 4 version to memstore
2626    long ts = 0;
2627    Put put = new Put(row1, ts);
2628    put.addColumn(fam1, col1, value1);
2629    region.put(put);
2630    put = new Put(row1, ts + 1);
2631    put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
2632    region.put(put);
2633    put = new Put(row1, ts + 2);
2634    put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
2635    region.put(put);
2636    put = new Put(row1, ts + 3);
2637    put.addColumn(fam1, col1, value2);
2638    region.put(put);
2639
2640    Get get = new Get(row1);
2641    get.readAllVersions();
2642    Result res = region.get(get);
2643    // Get 3 versions, the oldest version has gone from user view
2644    assertEquals(maxVersions, res.size());
2645
2646    get.setFilter(new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("value")));
2647    res = region.get(get);
2648    // When use value filter, the oldest version should still gone from user view and it
2649    // should only return one key vaule
2650    assertEquals(1, res.size());
2651    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2652    assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
2653
2654    region.flush(true);
2655    region.compact(true);
2656    Thread.sleep(1000);
2657    res = region.get(get);
2658    // After flush and compact, the result should be consistent with previous result
2659    assertEquals(1, res.size());
2660    assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
2661  }
2662
2663  // ////////////////////////////////////////////////////////////////////////////
2664  // Scanner tests
2665  // ////////////////////////////////////////////////////////////////////////////
2666  @Test
2667  public void testGetScanner_WithOkFamilies() throws IOException {
2668    byte[] fam1 = Bytes.toBytes("fam1");
2669    byte[] fam2 = Bytes.toBytes("fam2");
2670
2671    byte[][] families = { fam1, fam2 };
2672
2673    // Setting up region
2674    this.region = initHRegion(tableName, method, CONF, families);
2675    Scan scan = new Scan();
2676    scan.addFamily(fam1);
2677    scan.addFamily(fam2);
2678    try {
2679      region.getScanner(scan);
2680    } catch (Exception e) {
2681      assertTrue("Families could not be found in Region", false);
2682    }
2683  }
2684
2685  @Test
2686  public void testGetScanner_WithNotOkFamilies() throws IOException {
2687    byte[] fam1 = Bytes.toBytes("fam1");
2688    byte[] fam2 = Bytes.toBytes("fam2");
2689
2690    byte[][] families = { fam1 };
2691
2692    // Setting up region
2693    this.region = initHRegion(tableName, method, CONF, families);
2694    Scan scan = new Scan();
2695    scan.addFamily(fam2);
2696    boolean ok = false;
2697    try {
2698      region.getScanner(scan);
2699    } catch (Exception e) {
2700      ok = true;
2701    }
2702    assertTrue("Families could not be found in Region", ok);
2703  }
2704
2705  @Test
2706  public void testGetScanner_WithNoFamilies() throws IOException {
2707    byte[] row1 = Bytes.toBytes("row1");
2708    byte[] fam1 = Bytes.toBytes("fam1");
2709    byte[] fam2 = Bytes.toBytes("fam2");
2710    byte[] fam3 = Bytes.toBytes("fam3");
2711    byte[] fam4 = Bytes.toBytes("fam4");
2712
2713    byte[][] families = { fam1, fam2, fam3, fam4 };
2714
2715    // Setting up region
2716    this.region = initHRegion(tableName, method, CONF, families);
2717    // Putting data in Region
2718    Put put = new Put(row1);
2719    put.addColumn(fam1, null, null);
2720    put.addColumn(fam2, null, null);
2721    put.addColumn(fam3, null, null);
2722    put.addColumn(fam4, null, null);
2723    region.put(put);
2724
2725    Scan scan = null;
2726    HRegion.RegionScannerImpl is = null;
2727
2728    // Testing to see how many scanners that is produced by getScanner,
2729    // starting
2730    // with known number, 2 - current = 1
2731    scan = new Scan();
2732    scan.addFamily(fam2);
2733    scan.addFamily(fam4);
2734    is = region.getScanner(scan);
2735    assertEquals(1, is.storeHeap.getHeap().size());
2736
2737    scan = new Scan();
2738    is = region.getScanner(scan);
2739    assertEquals(families.length - 1, is.storeHeap.getHeap().size());
2740  }
2741
2742  /**
2743   * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2744   *
2745   * @throws IOException
2746   */
2747  @Test
2748  public void testGetScanner_WithRegionClosed() throws IOException {
2749    byte[] fam1 = Bytes.toBytes("fam1");
2750    byte[] fam2 = Bytes.toBytes("fam2");
2751
2752    byte[][] families = { fam1, fam2 };
2753
2754    // Setting up region
2755    try {
2756      this.region = initHRegion(tableName, method, CONF, families);
2757    } catch (IOException e) {
2758      e.printStackTrace();
2759      fail("Got IOException during initHRegion, " + e.getMessage());
2760    }
2761    region.closed.set(true);
2762    try {
2763      region.getScanner(null);
2764      fail("Expected to get an exception during getScanner on a region that is closed");
2765    } catch (NotServingRegionException e) {
2766      // this is the correct exception that is expected
2767    } catch (IOException e) {
2768      fail("Got wrong type of exception - should be a NotServingRegionException, " +
2769          "but was an IOException: "
2770          + e.getMessage());
2771    }
2772  }
2773
2774  @Test
2775  public void testRegionScanner_Next() throws IOException {
2776    byte[] row1 = Bytes.toBytes("row1");
2777    byte[] row2 = Bytes.toBytes("row2");
2778    byte[] fam1 = Bytes.toBytes("fam1");
2779    byte[] fam2 = Bytes.toBytes("fam2");
2780    byte[] fam3 = Bytes.toBytes("fam3");
2781    byte[] fam4 = Bytes.toBytes("fam4");
2782
2783    byte[][] families = { fam1, fam2, fam3, fam4 };
2784    long ts = System.currentTimeMillis();
2785
2786    // Setting up region
2787    this.region = initHRegion(tableName, method, CONF, families);
2788    // Putting data in Region
2789    Put put = null;
2790    put = new Put(row1);
2791    put.addColumn(fam1, (byte[]) null, ts, null);
2792    put.addColumn(fam2, (byte[]) null, ts, null);
2793    put.addColumn(fam3, (byte[]) null, ts, null);
2794    put.addColumn(fam4, (byte[]) null, ts, null);
2795    region.put(put);
2796
2797    put = new Put(row2);
2798    put.addColumn(fam1, (byte[]) null, ts, null);
2799    put.addColumn(fam2, (byte[]) null, ts, null);
2800    put.addColumn(fam3, (byte[]) null, ts, null);
2801    put.addColumn(fam4, (byte[]) null, ts, null);
2802    region.put(put);
2803
2804    Scan scan = new Scan();
2805    scan.addFamily(fam2);
2806    scan.addFamily(fam4);
2807    InternalScanner is = region.getScanner(scan);
2808
2809    List<Cell> res = null;
2810
2811    // Result 1
2812    List<Cell> expected1 = new ArrayList<>();
2813    expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2814    expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2815
2816    res = new ArrayList<>();
2817    is.next(res);
2818    for (int i = 0; i < res.size(); i++) {
2819      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2820    }
2821
2822    // Result 2
2823    List<Cell> expected2 = new ArrayList<>();
2824    expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2825    expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2826
2827    res = new ArrayList<>();
2828    is.next(res);
2829    for (int i = 0; i < res.size(); i++) {
2830      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2831    }
2832  }
2833
2834  @Test
2835  public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2836    byte[] row1 = Bytes.toBytes("row1");
2837    byte[] qf1 = Bytes.toBytes("qualifier1");
2838    byte[] qf2 = Bytes.toBytes("qualifier2");
2839    byte[] fam1 = Bytes.toBytes("fam1");
2840    byte[][] families = { fam1 };
2841
2842    long ts1 = System.currentTimeMillis();
2843    long ts2 = ts1 + 1;
2844    long ts3 = ts1 + 2;
2845
2846    // Setting up region
2847    this.region = initHRegion(tableName, method, CONF, families);
2848    // Putting data in Region
2849    Put put = null;
2850    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2851    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2852    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2853
2854    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2855    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2856    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2857
2858    put = new Put(row1);
2859    put.add(kv13);
2860    put.add(kv12);
2861    put.add(kv11);
2862    put.add(kv23);
2863    put.add(kv22);
2864    put.add(kv21);
2865    region.put(put);
2866
2867    // Expected
2868    List<Cell> expected = new ArrayList<>();
2869    expected.add(kv13);
2870    expected.add(kv12);
2871
2872    Scan scan = new Scan(row1);
2873    scan.addColumn(fam1, qf1);
2874    scan.setMaxVersions(MAX_VERSIONS);
2875    List<Cell> actual = new ArrayList<>();
2876    InternalScanner scanner = region.getScanner(scan);
2877
2878    boolean hasNext = scanner.next(actual);
2879    assertEquals(false, hasNext);
2880
2881    // Verify result
2882    for (int i = 0; i < expected.size(); i++) {
2883      assertEquals(expected.get(i), actual.get(i));
2884    }
2885  }
2886
2887  @Test
2888  public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2889    byte[] row1 = Bytes.toBytes("row1");
2890    byte[] qf1 = Bytes.toBytes("qualifier1");
2891    byte[] qf2 = Bytes.toBytes("qualifier2");
2892    byte[] fam1 = Bytes.toBytes("fam1");
2893    byte[][] families = { fam1 };
2894
2895    long ts1 = 1; // System.currentTimeMillis();
2896    long ts2 = ts1 + 1;
2897    long ts3 = ts1 + 2;
2898
2899    // Setting up region
2900    this.region = initHRegion(tableName, method, CONF, families);
2901    // Putting data in Region
2902    Put put = null;
2903    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2904    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2905    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2906
2907    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2908    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2909    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2910
2911    put = new Put(row1);
2912    put.add(kv13);
2913    put.add(kv12);
2914    put.add(kv11);
2915    put.add(kv23);
2916    put.add(kv22);
2917    put.add(kv21);
2918    region.put(put);
2919    region.flush(true);
2920
2921    // Expected
2922    List<Cell> expected = new ArrayList<>();
2923    expected.add(kv13);
2924    expected.add(kv12);
2925    expected.add(kv23);
2926    expected.add(kv22);
2927
2928    Scan scan = new Scan(row1);
2929    scan.addColumn(fam1, qf1);
2930    scan.addColumn(fam1, qf2);
2931    scan.setMaxVersions(MAX_VERSIONS);
2932    List<Cell> actual = new ArrayList<>();
2933    InternalScanner scanner = region.getScanner(scan);
2934
2935    boolean hasNext = scanner.next(actual);
2936    assertEquals(false, hasNext);
2937
2938    // Verify result
2939    for (int i = 0; i < expected.size(); i++) {
2940      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2941    }
2942  }
2943
2944  @Test
2945  public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws
2946      IOException {
2947    byte[] row1 = Bytes.toBytes("row1");
2948    byte[] fam1 = Bytes.toBytes("fam1");
2949    byte[][] families = { fam1 };
2950    byte[] qf1 = Bytes.toBytes("qualifier1");
2951    byte[] qf2 = Bytes.toBytes("qualifier2");
2952
2953    long ts1 = 1;
2954    long ts2 = ts1 + 1;
2955    long ts3 = ts1 + 2;
2956    long ts4 = ts1 + 3;
2957
2958    // Setting up region
2959    this.region = initHRegion(tableName, method, CONF, families);
2960    // Putting data in Region
2961    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
2962    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2963    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2964    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2965
2966    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
2967    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2968    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2969    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2970
2971    Put put = null;
2972    put = new Put(row1);
2973    put.add(kv14);
2974    put.add(kv24);
2975    region.put(put);
2976    region.flush(true);
2977
2978    put = new Put(row1);
2979    put.add(kv23);
2980    put.add(kv13);
2981    region.put(put);
2982    region.flush(true);
2983
2984    put = new Put(row1);
2985    put.add(kv22);
2986    put.add(kv12);
2987    region.put(put);
2988    region.flush(true);
2989
2990    put = new Put(row1);
2991    put.add(kv21);
2992    put.add(kv11);
2993    region.put(put);
2994
2995    // Expected
2996    List<Cell> expected = new ArrayList<>();
2997    expected.add(kv14);
2998    expected.add(kv13);
2999    expected.add(kv12);
3000    expected.add(kv24);
3001    expected.add(kv23);
3002    expected.add(kv22);
3003
3004    Scan scan = new Scan(row1);
3005    scan.addColumn(fam1, qf1);
3006    scan.addColumn(fam1, qf2);
3007    int versions = 3;
3008    scan.setMaxVersions(versions);
3009    List<Cell> actual = new ArrayList<>();
3010    InternalScanner scanner = region.getScanner(scan);
3011
3012    boolean hasNext = scanner.next(actual);
3013    assertEquals(false, hasNext);
3014
3015    // Verify result
3016    for (int i = 0; i < expected.size(); i++) {
3017      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3018    }
3019  }
3020
3021  @Test
3022  public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3023    byte[] row1 = Bytes.toBytes("row1");
3024    byte[] qf1 = Bytes.toBytes("qualifier1");
3025    byte[] qf2 = Bytes.toBytes("qualifier2");
3026    byte[] fam1 = Bytes.toBytes("fam1");
3027    byte[][] families = { fam1 };
3028
3029    long ts1 = System.currentTimeMillis();
3030    long ts2 = ts1 + 1;
3031    long ts3 = ts1 + 2;
3032
3033    // Setting up region
3034    this.region = initHRegion(tableName, method, CONF, families);
3035    // Putting data in Region
3036    Put put = null;
3037    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3038    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3039    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3040
3041    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3042    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3043    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3044
3045    put = new Put(row1);
3046    put.add(kv13);
3047    put.add(kv12);
3048    put.add(kv11);
3049    put.add(kv23);
3050    put.add(kv22);
3051    put.add(kv21);
3052    region.put(put);
3053
3054    // Expected
3055    List<Cell> expected = new ArrayList<>();
3056    expected.add(kv13);
3057    expected.add(kv12);
3058    expected.add(kv23);
3059    expected.add(kv22);
3060
3061    Scan scan = new Scan(row1);
3062    scan.addFamily(fam1);
3063    scan.setMaxVersions(MAX_VERSIONS);
3064    List<Cell> actual = new ArrayList<>();
3065    InternalScanner scanner = region.getScanner(scan);
3066
3067    boolean hasNext = scanner.next(actual);
3068    assertEquals(false, hasNext);
3069
3070    // Verify result
3071    for (int i = 0; i < expected.size(); i++) {
3072      assertEquals(expected.get(i), actual.get(i));
3073    }
3074  }
3075
3076  @Test
3077  public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3078    byte[] row1 = Bytes.toBytes("row1");
3079    byte[] qf1 = Bytes.toBytes("qualifier1");
3080    byte[] qf2 = Bytes.toBytes("qualifier2");
3081    byte[] fam1 = Bytes.toBytes("fam1");
3082
3083    long ts1 = 1; // System.currentTimeMillis();
3084    long ts2 = ts1 + 1;
3085    long ts3 = ts1 + 2;
3086
3087    // Setting up region
3088    this.region = initHRegion(tableName, method, CONF, fam1);
3089    // Putting data in Region
3090    Put put = null;
3091    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3092    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3093    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3094
3095    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3096    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3097    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3098
3099    put = new Put(row1);
3100    put.add(kv13);
3101    put.add(kv12);
3102    put.add(kv11);
3103    put.add(kv23);
3104    put.add(kv22);
3105    put.add(kv21);
3106    region.put(put);
3107    region.flush(true);
3108
3109    // Expected
3110    List<Cell> expected = new ArrayList<>();
3111    expected.add(kv13);
3112    expected.add(kv12);
3113    expected.add(kv23);
3114    expected.add(kv22);
3115
3116    Scan scan = new Scan(row1);
3117    scan.addFamily(fam1);
3118    scan.setMaxVersions(MAX_VERSIONS);
3119    List<Cell> actual = new ArrayList<>();
3120    InternalScanner scanner = region.getScanner(scan);
3121
3122    boolean hasNext = scanner.next(actual);
3123    assertEquals(false, hasNext);
3124
3125    // Verify result
3126    for (int i = 0; i < expected.size(); i++) {
3127      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3128    }
3129  }
3130
3131  @Test
3132  public void testScanner_StopRow1542() throws IOException {
3133    byte[] family = Bytes.toBytes("testFamily");
3134    this.region = initHRegion(tableName, method, CONF, family);
3135    byte[] row1 = Bytes.toBytes("row111");
3136    byte[] row2 = Bytes.toBytes("row222");
3137    byte[] row3 = Bytes.toBytes("row333");
3138    byte[] row4 = Bytes.toBytes("row444");
3139    byte[] row5 = Bytes.toBytes("row555");
3140
3141    byte[] col1 = Bytes.toBytes("Pub111");
3142    byte[] col2 = Bytes.toBytes("Pub222");
3143
3144    Put put = new Put(row1);
3145    put.addColumn(family, col1, Bytes.toBytes(10L));
3146    region.put(put);
3147
3148    put = new Put(row2);
3149    put.addColumn(family, col1, Bytes.toBytes(15L));
3150    region.put(put);
3151
3152    put = new Put(row3);
3153    put.addColumn(family, col2, Bytes.toBytes(20L));
3154    region.put(put);
3155
3156    put = new Put(row4);
3157    put.addColumn(family, col2, Bytes.toBytes(30L));
3158    region.put(put);
3159
3160    put = new Put(row5);
3161    put.addColumn(family, col1, Bytes.toBytes(40L));
3162    region.put(put);
3163
3164    Scan scan = new Scan(row3, row4);
3165    scan.setMaxVersions();
3166    scan.addColumn(family, col1);
3167    InternalScanner s = region.getScanner(scan);
3168
3169    List<Cell> results = new ArrayList<>();
3170    assertEquals(false, s.next(results));
3171    assertEquals(0, results.size());
3172  }
3173
3174  @Test
3175  public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3176    byte[] row1 = Bytes.toBytes("row1");
3177    byte[] fam1 = Bytes.toBytes("fam1");
3178    byte[] qf1 = Bytes.toBytes("qualifier1");
3179    byte[] qf2 = Bytes.toBytes("quateslifier2");
3180
3181    long ts1 = 1;
3182    long ts2 = ts1 + 1;
3183    long ts3 = ts1 + 2;
3184    long ts4 = ts1 + 3;
3185
3186    // Setting up region
3187    this.region = initHRegion(tableName, method, CONF, fam1);
3188    // Putting data in Region
3189    KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3190    KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3191    KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3192    KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3193
3194    KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3195    KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3196    KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3197    KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3198
3199    Put put = null;
3200    put = new Put(row1);
3201    put.add(kv14);
3202    put.add(kv24);
3203    region.put(put);
3204    region.flush(true);
3205
3206    put = new Put(row1);
3207    put.add(kv23);
3208    put.add(kv13);
3209    region.put(put);
3210    region.flush(true);
3211
3212    put = new Put(row1);
3213    put.add(kv22);
3214    put.add(kv12);
3215    region.put(put);
3216    region.flush(true);
3217
3218    put = new Put(row1);
3219    put.add(kv21);
3220    put.add(kv11);
3221    region.put(put);
3222
3223    // Expected
3224    List<KeyValue> expected = new ArrayList<>();
3225    expected.add(kv14);
3226    expected.add(kv13);
3227    expected.add(kv12);
3228    expected.add(kv24);
3229    expected.add(kv23);
3230    expected.add(kv22);
3231
3232    Scan scan = new Scan(row1);
3233    int versions = 3;
3234    scan.setMaxVersions(versions);
3235    List<Cell> actual = new ArrayList<>();
3236    InternalScanner scanner = region.getScanner(scan);
3237
3238    boolean hasNext = scanner.next(actual);
3239    assertEquals(false, hasNext);
3240
3241    // Verify result
3242    for (int i = 0; i < expected.size(); i++) {
3243      assertTrue(PrivateCellUtil.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3244    }
3245  }
3246
3247  /**
3248   * Added for HBASE-5416
3249   *
3250   * Here we test scan optimization when only subset of CFs are used in filter
3251   * conditions.
3252   */
3253  @Test
3254  public void testScanner_JoinedScanners() throws IOException {
3255    byte[] cf_essential = Bytes.toBytes("essential");
3256    byte[] cf_joined = Bytes.toBytes("joined");
3257    byte[] cf_alpha = Bytes.toBytes("alpha");
3258    this.region = initHRegion(tableName, method, CONF, cf_essential, cf_joined, cf_alpha);
3259    byte[] row1 = Bytes.toBytes("row1");
3260    byte[] row2 = Bytes.toBytes("row2");
3261    byte[] row3 = Bytes.toBytes("row3");
3262
3263    byte[] col_normal = Bytes.toBytes("d");
3264    byte[] col_alpha = Bytes.toBytes("a");
3265
3266    byte[] filtered_val = Bytes.toBytes(3);
3267
3268    Put put = new Put(row1);
3269    put.addColumn(cf_essential, col_normal, Bytes.toBytes(1));
3270    put.addColumn(cf_joined, col_alpha, Bytes.toBytes(1));
3271    region.put(put);
3272
3273    put = new Put(row2);
3274    put.addColumn(cf_essential, col_alpha, Bytes.toBytes(2));
3275    put.addColumn(cf_joined, col_normal, Bytes.toBytes(2));
3276    put.addColumn(cf_alpha, col_alpha, Bytes.toBytes(2));
3277    region.put(put);
3278
3279    put = new Put(row3);
3280    put.addColumn(cf_essential, col_normal, filtered_val);
3281    put.addColumn(cf_joined, col_normal, filtered_val);
3282    region.put(put);
3283
3284    // Check two things:
3285    // 1. result list contains expected values
3286    // 2. result list is sorted properly
3287
3288    Scan scan = new Scan();
3289    Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3290            CompareOperator.NOT_EQUAL, filtered_val);
3291    scan.setFilter(filter);
3292    scan.setLoadColumnFamiliesOnDemand(true);
3293    InternalScanner s = region.getScanner(scan);
3294
3295    List<Cell> results = new ArrayList<>();
3296    assertTrue(s.next(results));
3297    assertEquals(1, results.size());
3298    results.clear();
3299
3300    assertTrue(s.next(results));
3301    assertEquals(3, results.size());
3302    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3303    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3304    assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3305    results.clear();
3306
3307    assertFalse(s.next(results));
3308    assertEquals(0, results.size());
3309  }
3310
3311  /**
3312   * HBASE-5416
3313   *
3314   * Test case when scan limits amount of KVs returned on each next() call.
3315   */
3316  @Test
3317  public void testScanner_JoinedScannersWithLimits() throws IOException {
3318    final byte[] cf_first = Bytes.toBytes("first");
3319    final byte[] cf_second = Bytes.toBytes("second");
3320
3321    this.region = initHRegion(tableName, method, CONF, cf_first, cf_second);
3322    final byte[] col_a = Bytes.toBytes("a");
3323    final byte[] col_b = Bytes.toBytes("b");
3324
3325    Put put;
3326
3327    for (int i = 0; i < 10; i++) {
3328      put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3329      put.addColumn(cf_first, col_a, Bytes.toBytes(i));
3330      if (i < 5) {
3331        put.addColumn(cf_first, col_b, Bytes.toBytes(i));
3332        put.addColumn(cf_second, col_a, Bytes.toBytes(i));
3333        put.addColumn(cf_second, col_b, Bytes.toBytes(i));
3334      }
3335      region.put(put);
3336    }
3337
3338    Scan scan = new Scan();
3339    scan.setLoadColumnFamiliesOnDemand(true);
3340    Filter bogusFilter = new FilterBase() {
3341      @Override
3342      public ReturnCode filterCell(final Cell ignored) throws IOException {
3343        return ReturnCode.INCLUDE;
3344      }
3345      @Override
3346      public boolean isFamilyEssential(byte[] name) {
3347        return Bytes.equals(name, cf_first);
3348      }
3349    };
3350
3351    scan.setFilter(bogusFilter);
3352    InternalScanner s = region.getScanner(scan);
3353
3354    // Our data looks like this:
3355    // r0: first:a, first:b, second:a, second:b
3356    // r1: first:a, first:b, second:a, second:b
3357    // r2: first:a, first:b, second:a, second:b
3358    // r3: first:a, first:b, second:a, second:b
3359    // r4: first:a, first:b, second:a, second:b
3360    // r5: first:a
3361    // r6: first:a
3362    // r7: first:a
3363    // r8: first:a
3364    // r9: first:a
3365
3366    // But due to next's limit set to 3, we should get this:
3367    // r0: first:a, first:b, second:a
3368    // r0: second:b
3369    // r1: first:a, first:b, second:a
3370    // r1: second:b
3371    // r2: first:a, first:b, second:a
3372    // r2: second:b
3373    // r3: first:a, first:b, second:a
3374    // r3: second:b
3375    // r4: first:a, first:b, second:a
3376    // r4: second:b
3377    // r5: first:a
3378    // r6: first:a
3379    // r7: first:a
3380    // r8: first:a
3381    // r9: first:a
3382
3383    List<Cell> results = new ArrayList<>();
3384    int index = 0;
3385    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3386    while (true) {
3387      boolean more = s.next(results, scannerContext);
3388      if ((index >> 1) < 5) {
3389        if (index % 2 == 0) {
3390          assertEquals(3, results.size());
3391        } else {
3392          assertEquals(1, results.size());
3393        }
3394      } else {
3395        assertEquals(1, results.size());
3396      }
3397      results.clear();
3398      index++;
3399      if (!more) {
3400        break;
3401      }
3402    }
3403  }
3404
3405  /**
3406   * Write an HFile block full with Cells whose qualifier that are identical between
3407   * 0 and Short.MAX_VALUE. See HBASE-13329.
3408   * @throws Exception
3409   */
3410  @Test
3411  public void testLongQualifier() throws Exception {
3412    byte[] family = Bytes.toBytes("family");
3413    this.region = initHRegion(tableName, method, CONF, family);
3414    byte[] q = new byte[Short.MAX_VALUE+2];
3415    Arrays.fill(q, 0, q.length-1, (byte)42);
3416    for (byte i=0; i<10; i++) {
3417      Put p = new Put(Bytes.toBytes("row"));
3418      // qualifiers that differ past Short.MAX_VALUE
3419      q[q.length-1]=i;
3420      p.addColumn(family, q, q);
3421      region.put(p);
3422    }
3423    region.flush(false);
3424  }
3425
3426  /**
3427   * Flushes the cache in a thread while scanning. The tests verify that the
3428   * scan is coherent - e.g. the returned results are always of the same or
3429   * later update as the previous results.
3430   *
3431   * @throws IOException
3432   *           scan / compact
3433   * @throws InterruptedException
3434   *           thread join
3435   */
3436  @Test
3437  public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3438    byte[] family = Bytes.toBytes("family");
3439    int numRows = 1000;
3440    int flushAndScanInterval = 10;
3441    int compactInterval = 10 * flushAndScanInterval;
3442
3443    this.region = initHRegion(tableName, method, CONF, family);
3444    FlushThread flushThread = new FlushThread();
3445    try {
3446      flushThread.start();
3447
3448      Scan scan = new Scan();
3449      scan.addFamily(family);
3450      scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOperator.EQUAL,
3451          new BinaryComparator(Bytes.toBytes(5L))));
3452
3453      int expectedCount = 0;
3454      List<Cell> res = new ArrayList<>();
3455
3456      boolean toggle = true;
3457      for (long i = 0; i < numRows; i++) {
3458        Put put = new Put(Bytes.toBytes(i));
3459        put.setDurability(Durability.SKIP_WAL);
3460        put.addColumn(family, qual1, Bytes.toBytes(i % 10));
3461        region.put(put);
3462
3463        if (i != 0 && i % compactInterval == 0) {
3464          LOG.debug("iteration = " + i+ " ts="+System.currentTimeMillis());
3465          region.compact(true);
3466        }
3467
3468        if (i % 10 == 5L) {
3469          expectedCount++;
3470        }
3471
3472        if (i != 0 && i % flushAndScanInterval == 0) {
3473          res.clear();
3474          InternalScanner scanner = region.getScanner(scan);
3475          if (toggle) {
3476            flushThread.flush();
3477          }
3478          while (scanner.next(res))
3479            ;
3480          if (!toggle) {
3481            flushThread.flush();
3482          }
3483          assertEquals("toggle="+toggle+"i=" + i + " ts="+System.currentTimeMillis(),
3484              expectedCount, res.size());
3485          toggle = !toggle;
3486        }
3487      }
3488
3489    } finally {
3490      try {
3491        flushThread.done();
3492        flushThread.join();
3493        flushThread.checkNoError();
3494      } catch (InterruptedException ie) {
3495        LOG.warn("Caught exception when joining with flushThread", ie);
3496      }
3497      HBaseTestingUtility.closeRegionAndWAL(this.region);
3498      this.region = null;
3499    }
3500  }
3501
3502  protected class FlushThread extends Thread {
3503    private volatile boolean done;
3504    private Throwable error = null;
3505
3506    FlushThread() {
3507      super("FlushThread");
3508    }
3509
3510    public void done() {
3511      done = true;
3512      synchronized (this) {
3513        interrupt();
3514      }
3515    }
3516
3517    public void checkNoError() {
3518      if (error != null) {
3519        assertNull(error);
3520      }
3521    }
3522
3523    @Override
3524    public void run() {
3525      done = false;
3526      while (!done) {
3527        synchronized (this) {
3528          try {
3529            wait();
3530          } catch (InterruptedException ignored) {
3531            if (done) {
3532              break;
3533            }
3534          }
3535        }
3536        try {
3537          region.flush(true);
3538        } catch (IOException e) {
3539          if (!done) {
3540            LOG.error("Error while flushing cache", e);
3541            error = e;
3542          }
3543          break;
3544        } catch (Throwable t) {
3545          LOG.error("Uncaught exception", t);
3546          throw t;
3547        }
3548      }
3549    }
3550
3551    public void flush() {
3552      synchronized (this) {
3553        notify();
3554      }
3555    }
3556  }
3557
3558  /**
3559   * Writes very wide records and scans for the latest every time.. Flushes and
3560   * compacts the region every now and then to keep things realistic.
3561   *
3562   * @throws IOException
3563   *           by flush / scan / compaction
3564   * @throws InterruptedException
3565   *           when joining threads
3566   */
3567  @Test
3568  public void testWritesWhileScanning() throws IOException, InterruptedException {
3569    int testCount = 100;
3570    int numRows = 1;
3571    int numFamilies = 10;
3572    int numQualifiers = 100;
3573    int flushInterval = 7;
3574    int compactInterval = 5 * flushInterval;
3575    byte[][] families = new byte[numFamilies][];
3576    for (int i = 0; i < numFamilies; i++) {
3577      families[i] = Bytes.toBytes("family" + i);
3578    }
3579    byte[][] qualifiers = new byte[numQualifiers][];
3580    for (int i = 0; i < numQualifiers; i++) {
3581      qualifiers[i] = Bytes.toBytes("qual" + i);
3582    }
3583
3584    this.region = initHRegion(tableName, method, CONF, families);
3585    FlushThread flushThread = new FlushThread();
3586    PutThread putThread = new PutThread(numRows, families, qualifiers);
3587    try {
3588      putThread.start();
3589      putThread.waitForFirstPut();
3590
3591      flushThread.start();
3592
3593      Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3594
3595      int expectedCount = numFamilies * numQualifiers;
3596      List<Cell> res = new ArrayList<>();
3597
3598      long prevTimestamp = 0L;
3599      for (int i = 0; i < testCount; i++) {
3600
3601        if (i != 0 && i % compactInterval == 0) {
3602          region.compact(true);
3603          for (HStore store : region.getStores()) {
3604            store.closeAndArchiveCompactedFiles();
3605          }
3606        }
3607
3608        if (i != 0 && i % flushInterval == 0) {
3609          flushThread.flush();
3610        }
3611
3612        boolean previousEmpty = res.isEmpty();
3613        res.clear();
3614        try (InternalScanner scanner = region.getScanner(scan)) {
3615          boolean moreRows;
3616          do {
3617            moreRows = scanner.next(res);
3618          } while (moreRows);
3619        }
3620        if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3621          assertEquals("i=" + i, expectedCount, res.size());
3622          long timestamp = res.get(0).getTimestamp();
3623          assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3624              timestamp >= prevTimestamp);
3625          prevTimestamp = timestamp;
3626        }
3627      }
3628
3629      putThread.done();
3630
3631      region.flush(true);
3632
3633    } finally {
3634      try {
3635        flushThread.done();
3636        flushThread.join();
3637        flushThread.checkNoError();
3638
3639        putThread.join();
3640        putThread.checkNoError();
3641      } catch (InterruptedException ie) {
3642        LOG.warn("Caught exception when joining with flushThread", ie);
3643      }
3644
3645      try {
3646          HBaseTestingUtility.closeRegionAndWAL(this.region);
3647      } catch (DroppedSnapshotException dse) {
3648        // We could get this on way out because we interrupt the background flusher and it could
3649        // fail anywhere causing a DSE over in the background flusher... only it is not properly
3650        // dealt with so could still be memory hanging out when we get to here -- memory we can't
3651        // flush because the accounting is 'off' since original DSE.
3652      }
3653      this.region = null;
3654    }
3655  }
3656
3657  protected class PutThread extends Thread {
3658    private volatile boolean done;
3659    private volatile int numPutsFinished = 0;
3660
3661    private Throwable error = null;
3662    private int numRows;
3663    private byte[][] families;
3664    private byte[][] qualifiers;
3665
3666    private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3667      super("PutThread");
3668      this.numRows = numRows;
3669      this.families = families;
3670      this.qualifiers = qualifiers;
3671    }
3672
3673    /**
3674     * Block calling thread until this instance of PutThread has put at least one row.
3675     */
3676    public void waitForFirstPut() throws InterruptedException {
3677      // wait until put thread actually puts some data
3678      while (isAlive() && numPutsFinished == 0) {
3679        checkNoError();
3680        Thread.sleep(50);
3681      }
3682    }
3683
3684    public void done() {
3685      done = true;
3686      synchronized (this) {
3687        interrupt();
3688      }
3689    }
3690
3691    public void checkNoError() {
3692      if (error != null) {
3693        assertNull(error);
3694      }
3695    }
3696
3697    @Override
3698    public void run() {
3699      done = false;
3700      while (!done) {
3701        try {
3702          for (int r = 0; r < numRows; r++) {
3703            byte[] row = Bytes.toBytes("row" + r);
3704            Put put = new Put(row);
3705            put.setDurability(Durability.SKIP_WAL);
3706            byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3707            for (byte[] family : families) {
3708              for (byte[] qualifier : qualifiers) {
3709                put.addColumn(family, qualifier, numPutsFinished, value);
3710              }
3711            }
3712            region.put(put);
3713            numPutsFinished++;
3714            if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3715              LOG.debug("put iteration = {}", numPutsFinished);
3716              Delete delete = new Delete(row, (long) numPutsFinished - 30);
3717              region.delete(delete);
3718            }
3719            numPutsFinished++;
3720          }
3721        } catch (InterruptedIOException e) {
3722          // This is fine. It means we are done, or didn't get the lock on time
3723          LOG.info("Interrupted", e);
3724        } catch (IOException e) {
3725          LOG.error("Error while putting records", e);
3726          error = e;
3727          break;
3728        }
3729      }
3730
3731    }
3732
3733  }
3734
3735  /**
3736   * Writes very wide records and gets the latest row every time.. Flushes and
3737   * compacts the region aggressivly to catch issues.
3738   *
3739   * @throws IOException
3740   *           by flush / scan / compaction
3741   * @throws InterruptedException
3742   *           when joining threads
3743   */
3744  @Test
3745  public void testWritesWhileGetting() throws Exception {
3746    int testCount = 50;
3747    int numRows = 1;
3748    int numFamilies = 10;
3749    int numQualifiers = 100;
3750    int compactInterval = 100;
3751    byte[][] families = new byte[numFamilies][];
3752    for (int i = 0; i < numFamilies; i++) {
3753      families[i] = Bytes.toBytes("family" + i);
3754    }
3755    byte[][] qualifiers = new byte[numQualifiers][];
3756    for (int i = 0; i < numQualifiers; i++) {
3757      qualifiers[i] = Bytes.toBytes("qual" + i);
3758    }
3759
3760
3761    // This test flushes constantly and can cause many files to be created,
3762    // possibly
3763    // extending over the ulimit. Make sure compactions are aggressive in
3764    // reducing
3765    // the number of HFiles created.
3766    Configuration conf = HBaseConfiguration.create(CONF);
3767    conf.setInt("hbase.hstore.compaction.min", 1);
3768    conf.setInt("hbase.hstore.compaction.max", 1000);
3769    this.region = initHRegion(tableName, method, conf, families);
3770    PutThread putThread = null;
3771    MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
3772    try {
3773      putThread = new PutThread(numRows, families, qualifiers);
3774      putThread.start();
3775      putThread.waitForFirstPut();
3776
3777      // Add a thread that flushes as fast as possible
3778      ctx.addThread(new RepeatingTestThread(ctx) {
3779
3780        @Override
3781        public void doAnAction() throws Exception {
3782          region.flush(true);
3783          // Compact regularly to avoid creating too many files and exceeding
3784          // the ulimit.
3785          region.compact(false);
3786          for (HStore store : region.getStores()) {
3787            store.closeAndArchiveCompactedFiles();
3788          }
3789        }
3790      });
3791      ctx.startThreads();
3792
3793      Get get = new Get(Bytes.toBytes("row0"));
3794      Result result = null;
3795
3796      int expectedCount = numFamilies * numQualifiers;
3797
3798      long prevTimestamp = 0L;
3799      for (int i = 0; i < testCount; i++) {
3800        LOG.info("testWritesWhileGetting verify turn " + i);
3801        boolean previousEmpty = result == null || result.isEmpty();
3802        result = region.get(get);
3803        if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
3804          assertEquals("i=" + i, expectedCount, result.size());
3805          // TODO this was removed, now what dangit?!
3806          // search looking for the qualifier in question?
3807          long timestamp = 0;
3808          for (Cell kv : result.rawCells()) {
3809            if (CellUtil.matchingFamily(kv, families[0])
3810                && CellUtil.matchingQualifier(kv, qualifiers[0])) {
3811              timestamp = kv.getTimestamp();
3812            }
3813          }
3814          assertTrue(timestamp >= prevTimestamp);
3815          prevTimestamp = timestamp;
3816          Cell previousKV = null;
3817
3818          for (Cell kv : result.rawCells()) {
3819            byte[] thisValue = CellUtil.cloneValue(kv);
3820            if (previousKV != null) {
3821              if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
3822                LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
3823                    + "(memStoreTS:" + previousKV.getSequenceId() + ")" + ", New KV: " + kv
3824                    + "(memStoreTS:" + kv.getSequenceId() + ")");
3825                assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
3826              }
3827            }
3828            previousKV = kv;
3829          }
3830        }
3831      }
3832    } finally {
3833      if (putThread != null)
3834        putThread.done();
3835
3836      region.flush(true);
3837
3838      if (putThread != null) {
3839        putThread.join();
3840        putThread.checkNoError();
3841      }
3842
3843      ctx.stop();
3844      HBaseTestingUtility.closeRegionAndWAL(this.region);
3845      this.region = null;
3846    }
3847  }
3848
3849  @Test
3850  public void testHolesInMeta() throws Exception {
3851    byte[] family = Bytes.toBytes("family");
3852    this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
3853        false, family);
3854    byte[] rowNotServed = Bytes.toBytes("a");
3855    Get g = new Get(rowNotServed);
3856    try {
3857      region.get(g);
3858      fail();
3859    } catch (WrongRegionException x) {
3860      // OK
3861    }
3862    byte[] row = Bytes.toBytes("y");
3863    g = new Get(row);
3864    region.get(g);
3865  }
3866
3867  @Test
3868  public void testIndexesScanWithOneDeletedRow() throws IOException {
3869    byte[] family = Bytes.toBytes("family");
3870
3871    // Setting up region
3872    this.region = initHRegion(tableName, method, CONF, family);
3873    Put put = new Put(Bytes.toBytes(1L));
3874    put.addColumn(family, qual1, 1L, Bytes.toBytes(1L));
3875    region.put(put);
3876
3877    region.flush(true);
3878
3879    Delete delete = new Delete(Bytes.toBytes(1L), 1L);
3880    region.delete(delete);
3881
3882    put = new Put(Bytes.toBytes(2L));
3883    put.addColumn(family, qual1, 2L, Bytes.toBytes(2L));
3884    region.put(put);
3885
3886    Scan idxScan = new Scan();
3887    idxScan.addFamily(family);
3888    idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
3889        new SingleColumnValueFilter(family, qual1, CompareOperator.GREATER_OR_EQUAL,
3890            new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
3891                    CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
3892    InternalScanner scanner = region.getScanner(idxScan);
3893    List<Cell> res = new ArrayList<>();
3894
3895    while (scanner.next(res)) {
3896      // Ignore res value.
3897    }
3898    assertEquals(1L, res.size());
3899  }
3900
3901  // ////////////////////////////////////////////////////////////////////////////
3902  // Bloom filter test
3903  // ////////////////////////////////////////////////////////////////////////////
3904  @Test
3905  public void testBloomFilterSize() throws IOException {
3906    byte[] fam1 = Bytes.toBytes("fam1");
3907    byte[] qf1 = Bytes.toBytes("col");
3908    byte[] val1 = Bytes.toBytes("value1");
3909    // Create Table
3910    HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
3911        .setBloomFilterType(BloomType.ROWCOL);
3912
3913    HTableDescriptor htd = new HTableDescriptor(tableName);
3914    htd.addFamily(hcd);
3915    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3916    this.region = TEST_UTIL.createLocalHRegion(info, htd);
3917    int num_unique_rows = 10;
3918    int duplicate_multiplier = 2;
3919    int num_storefiles = 4;
3920
3921    int version = 0;
3922    for (int f = 0; f < num_storefiles; f++) {
3923      for (int i = 0; i < duplicate_multiplier; i++) {
3924        for (int j = 0; j < num_unique_rows; j++) {
3925          Put put = new Put(Bytes.toBytes("row" + j));
3926          put.setDurability(Durability.SKIP_WAL);
3927          long ts = version++;
3928          put.addColumn(fam1, qf1, ts, val1);
3929          region.put(put);
3930        }
3931      }
3932      region.flush(true);
3933    }
3934    // before compaction
3935    HStore store = region.getStore(fam1);
3936    Collection<HStoreFile> storeFiles = store.getStorefiles();
3937    for (HStoreFile storefile : storeFiles) {
3938      StoreFileReader reader = storefile.getReader();
3939      reader.loadFileInfo();
3940      reader.loadBloomfilter();
3941      assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
3942      assertEquals(num_unique_rows, reader.getFilterEntries());
3943    }
3944
3945    region.compact(true);
3946
3947    // after compaction
3948    storeFiles = store.getStorefiles();
3949    for (HStoreFile storefile : storeFiles) {
3950      StoreFileReader reader = storefile.getReader();
3951      reader.loadFileInfo();
3952      reader.loadBloomfilter();
3953      assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
3954      assertEquals(num_unique_rows, reader.getFilterEntries());
3955    }
3956  }
3957
3958  @Test
3959  public void testAllColumnsWithBloomFilter() throws IOException {
3960    byte[] TABLE = Bytes.toBytes(name.getMethodName());
3961    byte[] FAMILY = Bytes.toBytes("family");
3962
3963    // Create table
3964    HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
3965        .setBloomFilterType(BloomType.ROWCOL);
3966    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
3967    htd.addFamily(hcd);
3968    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
3969    this.region = TEST_UTIL.createLocalHRegion(info, htd);
3970    // For row:0, col:0: insert versions 1 through 5.
3971    byte[] row = Bytes.toBytes("row:" + 0);
3972    byte[] column = Bytes.toBytes("column:" + 0);
3973    Put put = new Put(row);
3974    put.setDurability(Durability.SKIP_WAL);
3975    for (long idx = 1; idx <= 4; idx++) {
3976      put.addColumn(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
3977    }
3978    region.put(put);
3979
3980    // Flush
3981    region.flush(true);
3982
3983    // Get rows
3984    Get get = new Get(row);
3985    get.readAllVersions();
3986    Cell[] kvs = region.get(get).rawCells();
3987
3988    // Check if rows are correct
3989    assertEquals(4, kvs.length);
3990    checkOneCell(kvs[0], FAMILY, 0, 0, 4);
3991    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
3992    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
3993    checkOneCell(kvs[3], FAMILY, 0, 0, 1);
3994  }
3995
3996  /**
3997   * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
3998   * issuing delete row on columns with bloom filter set to row+col
3999   * (BloomType.ROWCOL)
4000   */
4001  @Test
4002  public void testDeleteRowWithBloomFilter() throws IOException {
4003    byte[] familyName = Bytes.toBytes("familyName");
4004
4005    // Create Table
4006    HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4007        .setBloomFilterType(BloomType.ROWCOL);
4008
4009    HTableDescriptor htd = new HTableDescriptor(tableName);
4010    htd.addFamily(hcd);
4011    HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4012    this.region = TEST_UTIL.createLocalHRegion(info, htd);
4013    // Insert some data
4014    byte[] row = Bytes.toBytes("row1");
4015    byte[] col = Bytes.toBytes("col1");
4016
4017    Put put = new Put(row);
4018    put.addColumn(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4019    region.put(put);
4020    region.flush(true);
4021
4022    Delete del = new Delete(row);
4023    region.delete(del);
4024    region.flush(true);
4025
4026    // Get remaining rows (should have none)
4027    Get get = new Get(row);
4028    get.addColumn(familyName, col);
4029
4030    Cell[] keyValues = region.get(get).rawCells();
4031    assertEquals(0, keyValues.length);
4032  }
4033
4034  @Test
4035  public void testgetHDFSBlocksDistribution() throws Exception {
4036    HBaseTestingUtility htu = new HBaseTestingUtility();
4037    // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4038    // break up the file in to more pieces that can be distributed across the three nodes and we
4039    // won't be able to have the condition this test asserts; that at least one node has
4040    // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4041    // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4042    // final int DEFAULT_BLOCK_SIZE = 1024;
4043    // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4044    htu.getConfiguration().setInt("dfs.replication", 2);
4045
4046    // set up a cluster with 3 nodes
4047    MiniHBaseCluster cluster = null;
4048    String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4049    int regionServersCount = 3;
4050
4051    try {
4052      StartMiniClusterOption option = StartMiniClusterOption.builder()
4053          .numRegionServers(regionServersCount).dataNodeHosts(dataNodeHosts).build();
4054      cluster = htu.startMiniCluster(option);
4055      byte[][] families = { fam1, fam2 };
4056      Table ht = htu.createTable(tableName, families);
4057
4058      // Setting up region
4059      byte row[] = Bytes.toBytes("row1");
4060      byte col[] = Bytes.toBytes("col1");
4061
4062      Put put = new Put(row);
4063      put.addColumn(fam1, col, 1, Bytes.toBytes("test1"));
4064      put.addColumn(fam2, col, 1, Bytes.toBytes("test2"));
4065      ht.put(put);
4066
4067      HRegion firstRegion = htu.getHBaseCluster().getRegions(tableName).get(0);
4068      firstRegion.flush(true);
4069      HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4070
4071      // Given the default replication factor is 2 and we have 2 HFiles,
4072      // we will have total of 4 replica of blocks on 3 datanodes; thus there
4073      // must be at least one host that have replica for 2 HFiles. That host's
4074      // weight will be equal to the unique block weight.
4075      long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4076      StringBuilder sb = new StringBuilder();
4077      for (String host: blocksDistribution1.getTopHosts()) {
4078        if (sb.length() > 0) sb.append(", ");
4079        sb.append(host);
4080        sb.append("=");
4081        sb.append(blocksDistribution1.getWeight(host));
4082      }
4083
4084      String topHost = blocksDistribution1.getTopHosts().get(0);
4085      long topHostWeight = blocksDistribution1.getWeight(topHost);
4086      String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4087        topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4088      LOG.info(msg);
4089      assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4090
4091      // use the static method to compute the value, it should be the same.
4092      // static method is used by load balancer or other components
4093      HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4094          htu.getConfiguration(), firstRegion.getTableDescriptor(), firstRegion.getRegionInfo());
4095      long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4096
4097      assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4098
4099      ht.close();
4100    } finally {
4101      if (cluster != null) {
4102        htu.shutdownMiniCluster();
4103      }
4104    }
4105  }
4106
4107  /**
4108   * Testcase to check state of region initialization task set to ABORTED or not
4109   * if any exceptions during initialization
4110   *
4111   * @throws Exception
4112   */
4113  @Test
4114  public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4115    HRegionInfo info;
4116    try {
4117      FileSystem fs = Mockito.mock(FileSystem.class);
4118      Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4119      HTableDescriptor htd = new HTableDescriptor(tableName);
4120      htd.addFamily(new HColumnDescriptor("cf"));
4121      info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4122          HConstants.EMPTY_BYTE_ARRAY, false);
4123      Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4124      region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4125      // region initialization throws IOException and set task state to ABORTED.
4126      region.initialize();
4127      fail("Region initialization should fail due to IOException");
4128    } catch (IOException io) {
4129      List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4130      for (MonitoredTask monitoredTask : tasks) {
4131        if (!(monitoredTask instanceof MonitoredRPCHandler)
4132            && monitoredTask.getDescription().contains(region.toString())) {
4133          assertTrue("Region state should be ABORTED.",
4134              monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4135          break;
4136        }
4137      }
4138    }
4139  }
4140
4141  /**
4142   * Verifies that the .regioninfo file is written on region creation and that
4143   * is recreated if missing during region opening.
4144   */
4145  @Test
4146  public void testRegionInfoFileCreation() throws IOException {
4147    Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4148
4149    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4150    htd.addFamily(new HColumnDescriptor("cf"));
4151
4152    HRegionInfo hri = new HRegionInfo(htd.getTableName());
4153
4154    // Create a region and skip the initialization (like CreateTableHandler)
4155    region = HBaseTestingUtility.createRegionAndWAL(hri, rootDir, CONF, htd, false);
4156    Path regionDir = region.getRegionFileSystem().getRegionDir();
4157    FileSystem fs = region.getRegionFileSystem().getFileSystem();
4158    HBaseTestingUtility.closeRegionAndWAL(region);
4159
4160    Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4161
4162    // Verify that the .regioninfo file is present
4163    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4164        fs.exists(regionInfoFile));
4165
4166    // Try to open the region
4167    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4168    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4169    HBaseTestingUtility.closeRegionAndWAL(region);
4170
4171    // Verify that the .regioninfo file is still there
4172    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4173        fs.exists(regionInfoFile));
4174
4175    // Remove the .regioninfo file and verify is recreated on region open
4176    fs.delete(regionInfoFile, true);
4177    assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4178        fs.exists(regionInfoFile));
4179
4180    region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4181//    region = TEST_UTIL.openHRegion(hri, htd);
4182    assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4183    HBaseTestingUtility.closeRegionAndWAL(region);
4184
4185    // Verify that the .regioninfo file is still there
4186    assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4187        fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4188
4189    region = null;
4190  }
4191
4192  /**
4193   * TestCase for increment
4194   */
4195  private static class Incrementer implements Runnable {
4196    private HRegion region;
4197    private final static byte[] incRow = Bytes.toBytes("incRow");
4198    private final static byte[] family = Bytes.toBytes("family");
4199    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4200    private final static long ONE = 1L;
4201    private int incCounter;
4202
4203    public Incrementer(HRegion region, int incCounter) {
4204      this.region = region;
4205      this.incCounter = incCounter;
4206    }
4207
4208    @Override
4209    public void run() {
4210      int count = 0;
4211      while (count < incCounter) {
4212        Increment inc = new Increment(incRow);
4213        inc.addColumn(family, qualifier, ONE);
4214        count++;
4215        try {
4216          region.increment(inc);
4217        } catch (IOException e) {
4218          LOG.info("Count=" + count + ", " + e);
4219          break;
4220        }
4221      }
4222    }
4223  }
4224
4225  /**
4226   * Test case to check increment function with memstore flushing
4227   * @throws Exception
4228   */
4229  @Test
4230  public void testParallelIncrementWithMemStoreFlush() throws Exception {
4231    byte[] family = Incrementer.family;
4232    this.region = initHRegion(tableName, method, CONF, family);
4233    final HRegion region = this.region;
4234    final AtomicBoolean incrementDone = new AtomicBoolean(false);
4235    Runnable flusher = new Runnable() {
4236      @Override
4237      public void run() {
4238        while (!incrementDone.get()) {
4239          try {
4240            region.flush(true);
4241          } catch (Exception e) {
4242            e.printStackTrace();
4243          }
4244        }
4245      }
4246    };
4247
4248    // after all increment finished, the row will increment to 20*100 = 2000
4249    int threadNum = 20;
4250    int incCounter = 100;
4251    long expected = (long) threadNum * incCounter;
4252    Thread[] incrementers = new Thread[threadNum];
4253    Thread flushThread = new Thread(flusher);
4254    for (int i = 0; i < threadNum; i++) {
4255      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4256      incrementers[i].start();
4257    }
4258    flushThread.start();
4259    for (int i = 0; i < threadNum; i++) {
4260      incrementers[i].join();
4261    }
4262
4263    incrementDone.set(true);
4264    flushThread.join();
4265
4266    Get get = new Get(Incrementer.incRow);
4267    get.addColumn(Incrementer.family, Incrementer.qualifier);
4268    get.readVersions(1);
4269    Result res = this.region.get(get);
4270    List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4271
4272    // we just got the latest version
4273    assertEquals(1, kvs.size());
4274    Cell kv = kvs.get(0);
4275    assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4276  }
4277
4278  /**
4279   * TestCase for append
4280   */
4281  private static class Appender implements Runnable {
4282    private HRegion region;
4283    private final static byte[] appendRow = Bytes.toBytes("appendRow");
4284    private final static byte[] family = Bytes.toBytes("family");
4285    private final static byte[] qualifier = Bytes.toBytes("qualifier");
4286    private final static byte[] CHAR = Bytes.toBytes("a");
4287    private int appendCounter;
4288
4289    public Appender(HRegion region, int appendCounter) {
4290      this.region = region;
4291      this.appendCounter = appendCounter;
4292    }
4293
4294    @Override
4295    public void run() {
4296      int count = 0;
4297      while (count < appendCounter) {
4298        Append app = new Append(appendRow);
4299        app.addColumn(family, qualifier, CHAR);
4300        count++;
4301        try {
4302          region.append(app);
4303        } catch (IOException e) {
4304          LOG.info("Count=" + count + ", max=" + appendCounter + ", " + e);
4305          break;
4306        }
4307      }
4308    }
4309  }
4310
4311  /**
4312   * Test case to check append function with memstore flushing
4313   * @throws Exception
4314   */
4315  @Test
4316  public void testParallelAppendWithMemStoreFlush() throws Exception {
4317    byte[] family = Appender.family;
4318    this.region = initHRegion(tableName, method, CONF, family);
4319    final HRegion region = this.region;
4320    final AtomicBoolean appendDone = new AtomicBoolean(false);
4321    Runnable flusher = new Runnable() {
4322      @Override
4323      public void run() {
4324        while (!appendDone.get()) {
4325          try {
4326            region.flush(true);
4327          } catch (Exception e) {
4328            e.printStackTrace();
4329          }
4330        }
4331      }
4332    };
4333
4334    // After all append finished, the value will append to threadNum *
4335    // appendCounter Appender.CHAR
4336    int threadNum = 20;
4337    int appendCounter = 100;
4338    byte[] expected = new byte[threadNum * appendCounter];
4339    for (int i = 0; i < threadNum * appendCounter; i++) {
4340      System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4341    }
4342    Thread[] appenders = new Thread[threadNum];
4343    Thread flushThread = new Thread(flusher);
4344    for (int i = 0; i < threadNum; i++) {
4345      appenders[i] = new Thread(new Appender(this.region, appendCounter));
4346      appenders[i].start();
4347    }
4348    flushThread.start();
4349    for (int i = 0; i < threadNum; i++) {
4350      appenders[i].join();
4351    }
4352
4353    appendDone.set(true);
4354    flushThread.join();
4355
4356    Get get = new Get(Appender.appendRow);
4357    get.addColumn(Appender.family, Appender.qualifier);
4358    get.readVersions(1);
4359    Result res = this.region.get(get);
4360    List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4361
4362    // we just got the latest version
4363    assertEquals(1, kvs.size());
4364    Cell kv = kvs.get(0);
4365    byte[] appendResult = new byte[kv.getValueLength()];
4366    System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4367    assertArrayEquals(expected, appendResult);
4368  }
4369
4370  /**
4371   * Test case to check put function with memstore flushing for same row, same ts
4372   * @throws Exception
4373   */
4374  @Test
4375  public void testPutWithMemStoreFlush() throws Exception {
4376    byte[] family = Bytes.toBytes("family");
4377    byte[] qualifier = Bytes.toBytes("qualifier");
4378    byte[] row = Bytes.toBytes("putRow");
4379    byte[] value = null;
4380    this.region = initHRegion(tableName, method, CONF, family);
4381    Put put = null;
4382    Get get = null;
4383    List<Cell> kvs = null;
4384    Result res = null;
4385
4386    put = new Put(row);
4387    value = Bytes.toBytes("value0");
4388    put.addColumn(family, qualifier, 1234567L, value);
4389    region.put(put);
4390    get = new Get(row);
4391    get.addColumn(family, qualifier);
4392    get.readAllVersions();
4393    res = this.region.get(get);
4394    kvs = res.getColumnCells(family, qualifier);
4395    assertEquals(1, kvs.size());
4396    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4397
4398    region.flush(true);
4399    get = new Get(row);
4400    get.addColumn(family, qualifier);
4401    get.readAllVersions();
4402    res = this.region.get(get);
4403    kvs = res.getColumnCells(family, qualifier);
4404    assertEquals(1, kvs.size());
4405    assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4406
4407    put = new Put(row);
4408    value = Bytes.toBytes("value1");
4409    put.addColumn(family, qualifier, 1234567L, value);
4410    region.put(put);
4411    get = new Get(row);
4412    get.addColumn(family, qualifier);
4413    get.readAllVersions();
4414    res = this.region.get(get);
4415    kvs = res.getColumnCells(family, qualifier);
4416    assertEquals(1, kvs.size());
4417    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4418
4419    region.flush(true);
4420    get = new Get(row);
4421    get.addColumn(family, qualifier);
4422    get.readAllVersions();
4423    res = this.region.get(get);
4424    kvs = res.getColumnCells(family, qualifier);
4425    assertEquals(1, kvs.size());
4426    assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4427  }
4428
4429  @Test
4430  public void testDurability() throws Exception {
4431    // there are 5 x 5 cases:
4432    // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4433    // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4434
4435    // expected cases for append and sync wal
4436    durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4437    durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4438    durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4439
4440    durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4441    durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4442    durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4443
4444    durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4445    durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4446
4447    durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4448    durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4449
4450    durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4451    durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4452    durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4453
4454    // expected cases for async wal
4455    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4456    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4457    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4458    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4459    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4460    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4461
4462    durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4463    durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4464    durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4465    durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4466    durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4467    durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4468
4469    // expect skip wal cases
4470    durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4471    durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4472    durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false);
4473    durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false);
4474    durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false);
4475    durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false);
4476
4477  }
4478
4479  private void durabilityTest(String method, Durability tableDurability,
4480      Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4481      final boolean expectSyncFromLogSyncer) throws Exception {
4482    Configuration conf = HBaseConfiguration.create(CONF);
4483    method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4484    byte[] family = Bytes.toBytes("family");
4485    Path logDir = new Path(new Path(dir + method), "log");
4486    final Configuration walConf = new Configuration(conf);
4487    FSUtils.setRootDir(walConf, logDir);
4488    // XXX: The spied AsyncFSWAL can not work properly because of a Mockito defect that can not
4489    // deal with classes which have a field of an inner class. See discussions in HBASE-15536.
4490    walConf.set(WALFactory.WAL_PROVIDER, "filesystem");
4491    final WALFactory wals = new WALFactory(walConf, TEST_UTIL.getRandomUUID().toString());
4492    final WAL wal = spy(wals.getWAL(RegionInfoBuilder.newBuilder(tableName).build()));
4493    this.region = initHRegion(tableName, HConstants.EMPTY_START_ROW,
4494        HConstants.EMPTY_END_ROW, false, tableDurability, wal,
4495        new byte[][] { family });
4496
4497    Put put = new Put(Bytes.toBytes("r1"));
4498    put.addColumn(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4499    put.setDurability(mutationDurability);
4500    region.put(put);
4501
4502    //verify append called or not
4503    verify(wal, expectAppend ? times(1) : never())
4504      .append((HRegionInfo)any(), (WALKeyImpl)any(),
4505          (WALEdit)any(), Mockito.anyBoolean());
4506
4507    // verify sync called or not
4508    if (expectSync || expectSyncFromLogSyncer) {
4509      TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4510        @Override
4511        public boolean evaluate() throws Exception {
4512          try {
4513            if (expectSync) {
4514              verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4515            } else if (expectSyncFromLogSyncer) {
4516              verify(wal, times(1)).sync(); // wal syncer calls this one
4517            }
4518          } catch (Throwable ignore) {
4519          }
4520          return true;
4521        }
4522      });
4523    } else {
4524      //verify(wal, never()).sync(anyLong());
4525      verify(wal, never()).sync();
4526    }
4527
4528    HBaseTestingUtility.closeRegionAndWAL(this.region);
4529    wals.close();
4530    this.region = null;
4531  }
4532
4533  @Test
4534  public void testRegionReplicaSecondary() throws IOException {
4535    // create a primary region, load some data and flush
4536    // create a secondary region, and do a get against that
4537    Path rootDir = new Path(dir + name.getMethodName());
4538    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4539
4540    byte[][] families = new byte[][] {
4541        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4542    };
4543    byte[] cq = Bytes.toBytes("cq");
4544    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4545    for (byte[] family : families) {
4546      htd.addFamily(new HColumnDescriptor(family));
4547    }
4548
4549    long time = System.currentTimeMillis();
4550    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4551      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4552      false, time, 0);
4553    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4554      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4555      false, time, 1);
4556
4557    HRegion primaryRegion = null, secondaryRegion = null;
4558
4559    try {
4560      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4561          rootDir, TEST_UTIL.getConfiguration(), htd);
4562
4563      // load some data
4564      putData(primaryRegion, 0, 1000, cq, families);
4565
4566      // flush region
4567      primaryRegion.flush(true);
4568
4569      // open secondary region
4570      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4571
4572      verifyData(secondaryRegion, 0, 1000, cq, families);
4573    } finally {
4574      if (primaryRegion != null) {
4575        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4576      }
4577      if (secondaryRegion != null) {
4578        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4579      }
4580    }
4581  }
4582
4583  @Test
4584  public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4585    // create a primary region, load some data and flush
4586    // create a secondary region, and do a put against that
4587    Path rootDir = new Path(dir + name.getMethodName());
4588    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4589
4590    byte[][] families = new byte[][] {
4591        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4592    };
4593    byte[] cq = Bytes.toBytes("cq");
4594    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4595    for (byte[] family : families) {
4596      htd.addFamily(new HColumnDescriptor(family));
4597    }
4598
4599    long time = System.currentTimeMillis();
4600    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4601      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4602      false, time, 0);
4603    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4604      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4605      false, time, 1);
4606
4607    HRegion primaryRegion = null, secondaryRegion = null;
4608
4609    try {
4610      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4611          rootDir, TEST_UTIL.getConfiguration(), htd);
4612
4613      // load some data
4614      putData(primaryRegion, 0, 1000, cq, families);
4615
4616      // flush region
4617      primaryRegion.flush(true);
4618
4619      // open secondary region
4620      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4621
4622      try {
4623        putData(secondaryRegion, 0, 1000, cq, families);
4624        fail("Should have thrown exception");
4625      } catch (IOException ex) {
4626        // expected
4627      }
4628    } finally {
4629      if (primaryRegion != null) {
4630        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4631      }
4632      if (secondaryRegion != null) {
4633        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4634      }
4635    }
4636  }
4637
4638  static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4639    Configuration confForWAL = new Configuration(conf);
4640    confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4641    return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8));
4642  }
4643
4644  @Test
4645  public void testCompactionFromPrimary() throws IOException {
4646    Path rootDir = new Path(dir + name.getMethodName());
4647    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4648
4649    byte[][] families = new byte[][] {
4650        Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4651    };
4652    byte[] cq = Bytes.toBytes("cq");
4653    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
4654    for (byte[] family : families) {
4655      htd.addFamily(new HColumnDescriptor(family));
4656    }
4657
4658    long time = System.currentTimeMillis();
4659    HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4660      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4661      false, time, 0);
4662    HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4663      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4664      false, time, 1);
4665
4666    HRegion primaryRegion = null, secondaryRegion = null;
4667
4668    try {
4669      primaryRegion = HBaseTestingUtility.createRegionAndWAL(primaryHri,
4670          rootDir, TEST_UTIL.getConfiguration(), htd);
4671
4672      // load some data
4673      putData(primaryRegion, 0, 1000, cq, families);
4674
4675      // flush region
4676      primaryRegion.flush(true);
4677
4678      // open secondary region
4679      secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4680
4681      // move the file of the primary region to the archive, simulating a compaction
4682      Collection<HStoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4683      primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4684      Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem()
4685          .getStoreFiles(families[0]);
4686      Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
4687
4688      verifyData(secondaryRegion, 0, 1000, cq, families);
4689    } finally {
4690      if (primaryRegion != null) {
4691        HBaseTestingUtility.closeRegionAndWAL(primaryRegion);
4692      }
4693      if (secondaryRegion != null) {
4694        HBaseTestingUtility.closeRegionAndWAL(secondaryRegion);
4695      }
4696    }
4697  }
4698
4699  private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws
4700      IOException {
4701    putData(this.region, startRow, numRows, qf, families);
4702  }
4703
4704  private void putData(HRegion region,
4705      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4706    putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
4707  }
4708
4709  static void putData(HRegion region, Durability durability,
4710      int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
4711    for (int i = startRow; i < startRow + numRows; i++) {
4712      Put put = new Put(Bytes.toBytes("" + i));
4713      put.setDurability(durability);
4714      for (byte[] family : families) {
4715        put.addColumn(family, qf, null);
4716      }
4717      region.put(put);
4718      LOG.info(put.toString());
4719    }
4720  }
4721
4722  static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
4723      throws IOException {
4724    for (int i = startRow; i < startRow + numRows; i++) {
4725      byte[] row = Bytes.toBytes("" + i);
4726      Get get = new Get(row);
4727      for (byte[] family : families) {
4728        get.addColumn(family, qf);
4729      }
4730      Result result = newReg.get(get);
4731      Cell[] raw = result.rawCells();
4732      assertEquals(families.length, result.size());
4733      for (int j = 0; j < families.length; j++) {
4734        assertTrue(CellUtil.matchingRows(raw[j], row));
4735        assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
4736        assertTrue(CellUtil.matchingQualifier(raw[j], qf));
4737      }
4738    }
4739  }
4740
4741  static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
4742    // Now I have k, get values out and assert they are as expected.
4743    Get get = new Get(k).addFamily(family).readAllVersions();
4744    Cell[] results = r.get(get).rawCells();
4745    for (int j = 0; j < results.length; j++) {
4746      byte[] tmp = CellUtil.cloneValue(results[j]);
4747      // Row should be equal to value every time.
4748      assertTrue(Bytes.equals(k, tmp));
4749    }
4750  }
4751
4752  /*
4753   * Assert first value in the passed region is <code>firstValue</code>.
4754   *
4755   * @param r
4756   *
4757   * @param fs
4758   *
4759   * @param firstValue
4760   *
4761   * @throws IOException
4762   */
4763  protected void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
4764      throws IOException {
4765    byte[][] families = { fs };
4766    Scan scan = new Scan();
4767    for (int i = 0; i < families.length; i++)
4768      scan.addFamily(families[i]);
4769    InternalScanner s = r.getScanner(scan);
4770    try {
4771      List<Cell> curVals = new ArrayList<>();
4772      boolean first = true;
4773      OUTER_LOOP: while (s.next(curVals)) {
4774        for (Cell kv : curVals) {
4775          byte[] val = CellUtil.cloneValue(kv);
4776          byte[] curval = val;
4777          if (first) {
4778            first = false;
4779            assertTrue(Bytes.compareTo(curval, firstValue) == 0);
4780          } else {
4781            // Not asserting anything. Might as well break.
4782            break OUTER_LOOP;
4783          }
4784        }
4785      }
4786    } finally {
4787      s.close();
4788    }
4789  }
4790
4791  /**
4792   * Test that we get the expected flush results back
4793   */
4794  @Test
4795  public void testFlushResult() throws IOException {
4796    byte[] family = Bytes.toBytes("family");
4797
4798    this.region = initHRegion(tableName, method, family);
4799
4800    // empty memstore, flush doesn't run
4801    HRegion.FlushResult fr = region.flush(true);
4802    assertFalse(fr.isFlushSucceeded());
4803    assertFalse(fr.isCompactionNeeded());
4804
4805    // Flush enough files to get up to the threshold, doesn't need compactions
4806    for (int i = 0; i < 2; i++) {
4807      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4808      region.put(put);
4809      fr = region.flush(true);
4810      assertTrue(fr.isFlushSucceeded());
4811      assertFalse(fr.isCompactionNeeded());
4812    }
4813
4814    // Two flushes after the threshold, compactions are needed
4815    for (int i = 0; i < 2; i++) {
4816      Put put = new Put(tableName.toBytes()).addColumn(family, family, tableName.toBytes());
4817      region.put(put);
4818      fr = region.flush(true);
4819      assertTrue(fr.isFlushSucceeded());
4820      assertTrue(fr.isCompactionNeeded());
4821    }
4822  }
4823
4824  protected Configuration initSplit() {
4825    // Always compact if there is more than one store file.
4826    CONF.setInt("hbase.hstore.compactionThreshold", 2);
4827
4828    CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
4829
4830    // Increase the amount of time between client retries
4831    CONF.setLong("hbase.client.pause", 15 * 1000);
4832
4833    // This size should make it so we always split using the addContent
4834    // below. After adding all data, the first region is 1.3M
4835    CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
4836    return CONF;
4837  }
4838
4839  /**
4840   * @return A region on which you must call
4841   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4842   */
4843  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4844      byte[]... families) throws IOException {
4845    return initHRegion(tableName, callingMethod, conf, false, families);
4846  }
4847
4848  /**
4849   * @return A region on which you must call
4850   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4851   */
4852  protected HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
4853      boolean isReadOnly, byte[]... families) throws IOException {
4854    return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
4855  }
4856
4857  protected HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4858      String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
4859      throws IOException {
4860    Path logDir = TEST_UTIL.getDataTestDirOnTestFS(callingMethod + ".log");
4861    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
4862    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
4863    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
4864    return initHRegion(tableName, startKey, stopKey, isReadOnly,
4865        Durability.SYNC_WAL, wal, families);
4866  }
4867
4868  /**
4869   * @return A region on which you must call
4870   *         {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done.
4871   */
4872  public HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
4873      boolean isReadOnly, Durability durability, WAL wal, byte[]... families) throws IOException {
4874    return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey,
4875        isReadOnly, durability, wal, families);
4876  }
4877
4878  /**
4879   * Assert that the passed in Cell has expected contents for the specified row,
4880   * column & timestamp.
4881   */
4882  private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
4883    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
4884    assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
4885        Bytes.toString(CellUtil.cloneRow(kv)));
4886    assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
4887        Bytes.toString(CellUtil.cloneFamily(kv)));
4888    assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
4889        Bytes.toString(CellUtil.cloneQualifier(kv)));
4890    assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
4891    assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
4892        Bytes.toString(CellUtil.cloneValue(kv)));
4893  }
4894
4895  @Test
4896  public void testReverseScanner_FromMemStore_SingleCF_Normal()
4897      throws IOException {
4898    byte[] rowC = Bytes.toBytes("rowC");
4899    byte[] rowA = Bytes.toBytes("rowA");
4900    byte[] rowB = Bytes.toBytes("rowB");
4901    byte[] cf = Bytes.toBytes("CF");
4902    byte[][] families = { cf };
4903    byte[] col = Bytes.toBytes("C");
4904    long ts = 1;
4905    this.region = initHRegion(tableName, method, families);
4906    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4907    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4908        null);
4909    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4910    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4911    Put put = null;
4912    put = new Put(rowC);
4913    put.add(kv1);
4914    put.add(kv11);
4915    region.put(put);
4916    put = new Put(rowA);
4917    put.add(kv2);
4918    region.put(put);
4919    put = new Put(rowB);
4920    put.add(kv3);
4921    region.put(put);
4922
4923    Scan scan = new Scan(rowC);
4924    scan.setMaxVersions(5);
4925    scan.setReversed(true);
4926    InternalScanner scanner = region.getScanner(scan);
4927    List<Cell> currRow = new ArrayList<>();
4928    boolean hasNext = scanner.next(currRow);
4929    assertEquals(2, currRow.size());
4930    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4931        .get(0).getRowLength(), rowC, 0, rowC.length));
4932    assertTrue(hasNext);
4933    currRow.clear();
4934    hasNext = scanner.next(currRow);
4935    assertEquals(1, currRow.size());
4936    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4937        .get(0).getRowLength(), rowB, 0, rowB.length));
4938    assertTrue(hasNext);
4939    currRow.clear();
4940    hasNext = scanner.next(currRow);
4941    assertEquals(1, currRow.size());
4942    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4943        .get(0).getRowLength(), rowA, 0, rowA.length));
4944    assertFalse(hasNext);
4945    scanner.close();
4946  }
4947
4948  @Test
4949  public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
4950      throws IOException {
4951    byte[] rowC = Bytes.toBytes("rowC");
4952    byte[] rowA = Bytes.toBytes("rowA");
4953    byte[] rowB = Bytes.toBytes("rowB");
4954    byte[] rowD = Bytes.toBytes("rowD");
4955    byte[] cf = Bytes.toBytes("CF");
4956    byte[][] families = { cf };
4957    byte[] col = Bytes.toBytes("C");
4958    long ts = 1;
4959    this.region = initHRegion(tableName, method, families);
4960    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
4961    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
4962        null);
4963    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
4964    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
4965    Put put = null;
4966    put = new Put(rowC);
4967    put.add(kv1);
4968    put.add(kv11);
4969    region.put(put);
4970    put = new Put(rowA);
4971    put.add(kv2);
4972    region.put(put);
4973    put = new Put(rowB);
4974    put.add(kv3);
4975    region.put(put);
4976
4977    Scan scan = new Scan(rowD);
4978    List<Cell> currRow = new ArrayList<>();
4979    scan.setReversed(true);
4980    scan.setMaxVersions(5);
4981    InternalScanner scanner = region.getScanner(scan);
4982    boolean hasNext = scanner.next(currRow);
4983    assertEquals(2, currRow.size());
4984    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4985        .get(0).getRowLength(), rowC, 0, rowC.length));
4986    assertTrue(hasNext);
4987    currRow.clear();
4988    hasNext = scanner.next(currRow);
4989    assertEquals(1, currRow.size());
4990    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4991        .get(0).getRowLength(), rowB, 0, rowB.length));
4992    assertTrue(hasNext);
4993    currRow.clear();
4994    hasNext = scanner.next(currRow);
4995    assertEquals(1, currRow.size());
4996    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
4997        .get(0).getRowLength(), rowA, 0, rowA.length));
4998    assertFalse(hasNext);
4999    scanner.close();
5000  }
5001
5002  @Test
5003  public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5004      throws IOException {
5005    byte[] rowC = Bytes.toBytes("rowC");
5006    byte[] rowA = Bytes.toBytes("rowA");
5007    byte[] rowB = Bytes.toBytes("rowB");
5008    byte[] cf = Bytes.toBytes("CF");
5009    byte[][] families = { cf };
5010    byte[] col = Bytes.toBytes("C");
5011    long ts = 1;
5012    this.region = initHRegion(tableName, method, families);
5013    KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5014    KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5015        null);
5016    KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5017    KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5018    Put put = null;
5019    put = new Put(rowC);
5020    put.add(kv1);
5021    put.add(kv11);
5022    region.put(put);
5023    put = new Put(rowA);
5024    put.add(kv2);
5025    region.put(put);
5026    put = new Put(rowB);
5027    put.add(kv3);
5028    region.put(put);
5029    Scan scan = new Scan();
5030    List<Cell> currRow = new ArrayList<>();
5031    scan.setReversed(true);
5032    InternalScanner scanner = region.getScanner(scan);
5033    boolean hasNext = scanner.next(currRow);
5034    assertEquals(1, currRow.size());
5035    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5036        .get(0).getRowLength(), rowC, 0, rowC.length));
5037    assertTrue(hasNext);
5038    currRow.clear();
5039    hasNext = scanner.next(currRow);
5040    assertEquals(1, currRow.size());
5041    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5042        .get(0).getRowLength(), rowB, 0, rowB.length));
5043    assertTrue(hasNext);
5044    currRow.clear();
5045    hasNext = scanner.next(currRow);
5046    assertEquals(1, currRow.size());
5047    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5048        .get(0).getRowLength(), rowA, 0, rowA.length));
5049    assertFalse(hasNext);
5050    scanner.close();
5051  }
5052
5053  @Test
5054  public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5055    // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5056    byte[] rowA = Bytes.toBytes("rowA");
5057    byte[] rowB = Bytes.toBytes("rowB");
5058    byte[] rowC = Bytes.toBytes("rowC");
5059    byte[] rowD = Bytes.toBytes("rowD");
5060    byte[] rowE = Bytes.toBytes("rowE");
5061    byte[] cf = Bytes.toBytes("CF");
5062    byte[][] families = { cf };
5063    byte[] col1 = Bytes.toBytes("col1");
5064    byte[] col2 = Bytes.toBytes("col2");
5065    long ts = 1;
5066    this.region = initHRegion(tableName, method, families);
5067    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5068    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5069    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5070    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5071    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5072    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5073    Put put = null;
5074    put = new Put(rowA);
5075    put.add(kv1);
5076    region.put(put);
5077    put = new Put(rowB);
5078    put.add(kv2);
5079    region.put(put);
5080    put = new Put(rowC);
5081    put.add(kv3);
5082    region.put(put);
5083    put = new Put(rowD);
5084    put.add(kv4_1);
5085    region.put(put);
5086    put = new Put(rowD);
5087    put.add(kv4_2);
5088    region.put(put);
5089    put = new Put(rowE);
5090    put.add(kv5);
5091    region.put(put);
5092    region.flush(true);
5093    Scan scan = new Scan(rowD, rowA);
5094    scan.addColumn(families[0], col1);
5095    scan.setReversed(true);
5096    List<Cell> currRow = new ArrayList<>();
5097    InternalScanner scanner = region.getScanner(scan);
5098    boolean hasNext = scanner.next(currRow);
5099    assertEquals(1, currRow.size());
5100    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5101        .get(0).getRowLength(), rowD, 0, rowD.length));
5102    assertTrue(hasNext);
5103    currRow.clear();
5104    hasNext = scanner.next(currRow);
5105    assertEquals(1, currRow.size());
5106    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5107        .get(0).getRowLength(), rowC, 0, rowC.length));
5108    assertTrue(hasNext);
5109    currRow.clear();
5110    hasNext = scanner.next(currRow);
5111    assertEquals(1, currRow.size());
5112    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5113        .get(0).getRowLength(), rowB, 0, rowB.length));
5114    assertFalse(hasNext);
5115    scanner.close();
5116
5117    scan = new Scan(rowD, rowA);
5118    scan.addColumn(families[0], col2);
5119    scan.setReversed(true);
5120    currRow.clear();
5121    scanner = region.getScanner(scan);
5122    hasNext = scanner.next(currRow);
5123    assertEquals(1, currRow.size());
5124    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5125        .get(0).getRowLength(), rowD, 0, rowD.length));
5126    scanner.close();
5127  }
5128
5129  @Test
5130  public void testReverseScanner_smaller_blocksize() throws IOException {
5131    // case to ensure no conflict with HFile index optimization
5132    byte[] rowA = Bytes.toBytes("rowA");
5133    byte[] rowB = Bytes.toBytes("rowB");
5134    byte[] rowC = Bytes.toBytes("rowC");
5135    byte[] rowD = Bytes.toBytes("rowD");
5136    byte[] rowE = Bytes.toBytes("rowE");
5137    byte[] cf = Bytes.toBytes("CF");
5138    byte[][] families = { cf };
5139    byte[] col1 = Bytes.toBytes("col1");
5140    byte[] col2 = Bytes.toBytes("col2");
5141    long ts = 1;
5142    HBaseConfiguration config = new HBaseConfiguration();
5143    config.setInt("test.block.size", 1);
5144    this.region = initHRegion(tableName, method, config, families);
5145    KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5146    KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5147    KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5148    KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5149    KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5150    KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5151    Put put = null;
5152    put = new Put(rowA);
5153    put.add(kv1);
5154    region.put(put);
5155    put = new Put(rowB);
5156    put.add(kv2);
5157    region.put(put);
5158    put = new Put(rowC);
5159    put.add(kv3);
5160    region.put(put);
5161    put = new Put(rowD);
5162    put.add(kv4_1);
5163    region.put(put);
5164    put = new Put(rowD);
5165    put.add(kv4_2);
5166    region.put(put);
5167    put = new Put(rowE);
5168    put.add(kv5);
5169    region.put(put);
5170    region.flush(true);
5171    Scan scan = new Scan(rowD, rowA);
5172    scan.addColumn(families[0], col1);
5173    scan.setReversed(true);
5174    List<Cell> currRow = new ArrayList<>();
5175    InternalScanner scanner = region.getScanner(scan);
5176    boolean hasNext = scanner.next(currRow);
5177    assertEquals(1, currRow.size());
5178    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5179        .get(0).getRowLength(), rowD, 0, rowD.length));
5180    assertTrue(hasNext);
5181    currRow.clear();
5182    hasNext = scanner.next(currRow);
5183    assertEquals(1, currRow.size());
5184    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5185        .get(0).getRowLength(), rowC, 0, rowC.length));
5186    assertTrue(hasNext);
5187    currRow.clear();
5188    hasNext = scanner.next(currRow);
5189    assertEquals(1, currRow.size());
5190    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5191        .get(0).getRowLength(), rowB, 0, rowB.length));
5192    assertFalse(hasNext);
5193    scanner.close();
5194
5195    scan = new Scan(rowD, rowA);
5196    scan.addColumn(families[0], col2);
5197    scan.setReversed(true);
5198    currRow.clear();
5199    scanner = region.getScanner(scan);
5200    hasNext = scanner.next(currRow);
5201    assertEquals(1, currRow.size());
5202    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5203        .get(0).getRowLength(), rowD, 0, rowD.length));
5204    scanner.close();
5205  }
5206
5207  @Test
5208  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5209      throws IOException {
5210    byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5211    byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5212    byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5213    byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5214    byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5215    byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5216    byte[] cf1 = Bytes.toBytes("CF1");
5217    byte[] cf2 = Bytes.toBytes("CF2");
5218    byte[] cf3 = Bytes.toBytes("CF3");
5219    byte[][] families = { cf1, cf2, cf3 };
5220    byte[] col = Bytes.toBytes("C");
5221    long ts = 1;
5222    HBaseConfiguration conf = new HBaseConfiguration();
5223    // disable compactions in this test.
5224    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5225    this.region = initHRegion(tableName, method, conf, families);
5226    // kv naming style: kv(row number) totalKvCountInThisRow seq no
5227    KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5228        null);
5229    KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5230        null);
5231    KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5232        KeyValue.Type.Put, null);
5233    KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5234        null);
5235    KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5236        null);
5237    KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5238        null);
5239    KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5240        KeyValue.Type.Put, null);
5241    KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5242        null);
5243    KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5244        KeyValue.Type.Put, null);
5245    KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5246        null);
5247    KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5248        null);
5249    KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5250        KeyValue.Type.Put, null);
5251    KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5252        null);
5253    KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5254        KeyValue.Type.Put, null);
5255    KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5256        null);
5257    KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5258        null);
5259    // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5260    Put put = null;
5261    put = new Put(row1);
5262    put.add(kv1_2_1);
5263    region.put(put);
5264    put = new Put(row2);
5265    put.add(kv2_4_1);
5266    region.put(put);
5267    put = new Put(row4);
5268    put.add(kv4_5_4);
5269    put.add(kv4_5_5);
5270    region.put(put);
5271    region.flush(true);
5272    // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5273    put = new Put(row4);
5274    put.add(kv4_5_1);
5275    put.add(kv4_5_3);
5276    region.put(put);
5277    put = new Put(row1);
5278    put.add(kv1_2_2);
5279    region.put(put);
5280    put = new Put(row2);
5281    put.add(kv2_4_4);
5282    region.put(put);
5283    region.flush(true);
5284    // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5285    put = new Put(row4);
5286    put.add(kv4_5_2);
5287    region.put(put);
5288    put = new Put(row2);
5289    put.add(kv2_4_2);
5290    put.add(kv2_4_3);
5291    region.put(put);
5292    put = new Put(row3);
5293    put.add(kv3_2_2);
5294    region.put(put);
5295    region.flush(true);
5296    // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5297    // ( 2 kv)
5298    put = new Put(row0);
5299    put.add(kv0_1_1);
5300    region.put(put);
5301    put = new Put(row3);
5302    put.add(kv3_2_1);
5303    region.put(put);
5304    put = new Put(row5);
5305    put.add(kv5_2_1);
5306    put.add(kv5_2_2);
5307    region.put(put);
5308    // scan range = ["row4", min), skip the max "row5"
5309    Scan scan = new Scan(row4);
5310    scan.setMaxVersions(5);
5311    scan.setBatch(3);
5312    scan.setReversed(true);
5313    InternalScanner scanner = region.getScanner(scan);
5314    List<Cell> currRow = new ArrayList<>();
5315    boolean hasNext = false;
5316    // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5317    // included in scan range
5318    // "row4" takes 2 next() calls since batch=3
5319    hasNext = scanner.next(currRow);
5320    assertEquals(3, currRow.size());
5321    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5322        .get(0).getRowLength(), row4, 0, row4.length));
5323    assertTrue(hasNext);
5324    currRow.clear();
5325    hasNext = scanner.next(currRow);
5326    assertEquals(2, currRow.size());
5327    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(),
5328        currRow.get(0).getRowLength(), row4, 0,
5329      row4.length));
5330    assertTrue(hasNext);
5331    // 2. scan out "row3" (2 kv)
5332    currRow.clear();
5333    hasNext = scanner.next(currRow);
5334    assertEquals(2, currRow.size());
5335    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5336        .get(0).getRowLength(), row3, 0, row3.length));
5337    assertTrue(hasNext);
5338    // 3. scan out "row2" (4 kvs)
5339    // "row2" takes 2 next() calls since batch=3
5340    currRow.clear();
5341    hasNext = scanner.next(currRow);
5342    assertEquals(3, currRow.size());
5343    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5344        .get(0).getRowLength(), row2, 0, row2.length));
5345    assertTrue(hasNext);
5346    currRow.clear();
5347    hasNext = scanner.next(currRow);
5348    assertEquals(1, currRow.size());
5349    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5350        .get(0).getRowLength(), row2, 0, row2.length));
5351    assertTrue(hasNext);
5352    // 4. scan out "row1" (2 kv)
5353    currRow.clear();
5354    hasNext = scanner.next(currRow);
5355    assertEquals(2, currRow.size());
5356    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5357        .get(0).getRowLength(), row1, 0, row1.length));
5358    assertTrue(hasNext);
5359    // 5. scan out "row0" (1 kv)
5360    currRow.clear();
5361    hasNext = scanner.next(currRow);
5362    assertEquals(1, currRow.size());
5363    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5364        .get(0).getRowLength(), row0, 0, row0.length));
5365    assertFalse(hasNext);
5366
5367    scanner.close();
5368  }
5369
5370  @Test
5371  public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5372      throws IOException {
5373    byte[] row1 = Bytes.toBytes("row1");
5374    byte[] row2 = Bytes.toBytes("row2");
5375    byte[] row3 = Bytes.toBytes("row3");
5376    byte[] row4 = Bytes.toBytes("row4");
5377    byte[] cf1 = Bytes.toBytes("CF1");
5378    byte[] cf2 = Bytes.toBytes("CF2");
5379    byte[] cf3 = Bytes.toBytes("CF3");
5380    byte[] cf4 = Bytes.toBytes("CF4");
5381    byte[][] families = { cf1, cf2, cf3, cf4 };
5382    byte[] col = Bytes.toBytes("C");
5383    long ts = 1;
5384    HBaseConfiguration conf = new HBaseConfiguration();
5385    // disable compactions in this test.
5386    conf.setInt("hbase.hstore.compactionThreshold", 10000);
5387    this.region = initHRegion(tableName, method, conf, families);
5388    KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5389    KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5390    KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5391    KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5392    // storefile1
5393    Put put = new Put(row1);
5394    put.add(kv1);
5395    region.put(put);
5396    region.flush(true);
5397    // storefile2
5398    put = new Put(row2);
5399    put.add(kv2);
5400    region.put(put);
5401    region.flush(true);
5402    // storefile3
5403    put = new Put(row3);
5404    put.add(kv3);
5405    region.put(put);
5406    region.flush(true);
5407    // memstore
5408    put = new Put(row4);
5409    put.add(kv4);
5410    region.put(put);
5411    // scan range = ["row4", min)
5412    Scan scan = new Scan(row4);
5413    scan.setReversed(true);
5414    scan.setBatch(10);
5415    InternalScanner scanner = region.getScanner(scan);
5416    List<Cell> currRow = new ArrayList<>();
5417    boolean hasNext = scanner.next(currRow);
5418    assertEquals(1, currRow.size());
5419    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5420        .get(0).getRowLength(), row4, 0, row4.length));
5421    assertTrue(hasNext);
5422    currRow.clear();
5423    hasNext = scanner.next(currRow);
5424    assertEquals(1, currRow.size());
5425    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5426        .get(0).getRowLength(), row3, 0, row3.length));
5427    assertTrue(hasNext);
5428    currRow.clear();
5429    hasNext = scanner.next(currRow);
5430    assertEquals(1, currRow.size());
5431    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5432        .get(0).getRowLength(), row2, 0, row2.length));
5433    assertTrue(hasNext);
5434    currRow.clear();
5435    hasNext = scanner.next(currRow);
5436    assertEquals(1, currRow.size());
5437    assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow
5438        .get(0).getRowLength(), row1, 0, row1.length));
5439    assertFalse(hasNext);
5440  }
5441
5442  /**
5443   * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5444   */
5445  @Test
5446  public void testReverseScanner_StackOverflow() throws IOException {
5447    byte[] cf1 = Bytes.toBytes("CF1");
5448    byte[][] families = {cf1};
5449    byte[] col = Bytes.toBytes("C");
5450    HBaseConfiguration conf = new HBaseConfiguration();
5451    this.region = initHRegion(tableName, method, conf, families);
5452    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5453    Put put = new Put(Bytes.toBytes("19998"));
5454    put.addColumn(cf1, col, Bytes.toBytes("val"));
5455    region.put(put);
5456    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5457    Put put2 = new Put(Bytes.toBytes("19997"));
5458    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5459    region.put(put2);
5460
5461    Scan scan = new Scan(Bytes.toBytes("19998"));
5462    scan.setReversed(true);
5463    InternalScanner scanner = region.getScanner(scan);
5464
5465    // create one storefile contains many rows will be skipped
5466    // to check StoreFileScanner.seekToPreviousRow
5467    for (int i = 10000; i < 20000; i++) {
5468      Put p = new Put(Bytes.toBytes(""+i));
5469      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5470      region.put(p);
5471    }
5472    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5473
5474    // create one memstore contains many rows will be skipped
5475    // to check MemStoreScanner.seekToPreviousRow
5476    for (int i = 10000; i < 20000; i++) {
5477      Put p = new Put(Bytes.toBytes(""+i));
5478      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5479      region.put(p);
5480    }
5481
5482    List<Cell> currRow = new ArrayList<>();
5483    boolean hasNext;
5484    do {
5485      hasNext = scanner.next(currRow);
5486    } while (hasNext);
5487    assertEquals(2, currRow.size());
5488    assertEquals("19998", Bytes.toString(currRow.get(0).getRowArray(),
5489      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5490    assertEquals("19997", Bytes.toString(currRow.get(1).getRowArray(),
5491      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5492  }
5493
5494  @Test
5495  public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exception {
5496    byte[] cf1 = Bytes.toBytes("CF1");
5497    byte[][] families = { cf1 };
5498    byte[] col = Bytes.toBytes("C");
5499    HBaseConfiguration conf = new HBaseConfiguration();
5500    this.region = initHRegion(tableName, method, conf, families);
5501    // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5502    Put put = new Put(Bytes.toBytes("19996"));
5503    put.addColumn(cf1, col, Bytes.toBytes("val"));
5504    region.put(put);
5505    Put put2 = new Put(Bytes.toBytes("19995"));
5506    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5507    region.put(put2);
5508    // create a reverse scan
5509    Scan scan = new Scan(Bytes.toBytes("19996"));
5510    scan.setReversed(true);
5511    RegionScannerImpl scanner = region.getScanner(scan);
5512
5513    // flush the cache. This will reset the store scanner
5514    region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
5515
5516    // create one memstore contains many rows will be skipped
5517    // to check MemStoreScanner.seekToPreviousRow
5518    for (int i = 10000; i < 20000; i++) {
5519      Put p = new Put(Bytes.toBytes("" + i));
5520      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5521      region.put(p);
5522    }
5523    List<Cell> currRow = new ArrayList<>();
5524    boolean hasNext;
5525    boolean assertDone = false;
5526    do {
5527      hasNext = scanner.next(currRow);
5528      // With HBASE-15871, after the scanner is reset the memstore scanner should not be
5529      // added here
5530      if (!assertDone) {
5531        StoreScanner current =
5532            (StoreScanner) (scanner.storeHeap).getCurrentForTesting();
5533        List<KeyValueScanner> scanners = current.getAllScannersForTesting();
5534        assertEquals("There should be only one scanner the store file scanner", 1,
5535          scanners.size());
5536        assertDone = true;
5537      }
5538    } while (hasNext);
5539    assertEquals(2, currRow.size());
5540    assertEquals("19996", Bytes.toString(currRow.get(0).getRowArray(),
5541      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5542    assertEquals("19995", Bytes.toString(currRow.get(1).getRowArray(),
5543      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5544  }
5545
5546  @Test
5547  public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception {
5548    byte[] cf1 = Bytes.toBytes("CF1");
5549    byte[][] families = { cf1 };
5550    byte[] col = Bytes.toBytes("C");
5551
5552    HBaseConfiguration conf = new HBaseConfiguration();
5553    this.region = initHRegion(tableName, method, conf, families);
5554
5555    Put put = new Put(Bytes.toBytes("199996"));
5556    put.addColumn(cf1, col, Bytes.toBytes("val"));
5557    region.put(put);
5558    Put put2 = new Put(Bytes.toBytes("199995"));
5559    put2.addColumn(cf1, col, Bytes.toBytes("val"));
5560    region.put(put2);
5561
5562    // Create a reverse scan
5563    Scan scan = new Scan(Bytes.toBytes("199996"));
5564    scan.setReversed(true);
5565    RegionScannerImpl scanner = region.getScanner(scan);
5566
5567    // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan
5568    for (int i = 100000; i < 200000; i++) {
5569      Put p = new Put(Bytes.toBytes("" + i));
5570      p.addColumn(cf1, col, Bytes.toBytes("" + i));
5571      region.put(p);
5572    }
5573    List<Cell> currRow = new ArrayList<>();
5574    boolean hasNext;
5575    do {
5576      hasNext = scanner.next(currRow);
5577    } while (hasNext);
5578
5579    assertEquals(2, currRow.size());
5580    assertEquals("199996", Bytes.toString(currRow.get(0).getRowArray(),
5581      currRow.get(0).getRowOffset(), currRow.get(0).getRowLength()));
5582    assertEquals("199995", Bytes.toString(currRow.get(1).getRowArray(),
5583      currRow.get(1).getRowOffset(), currRow.get(1).getRowLength()));
5584  }
5585
5586  @Test
5587  public void testWriteRequestsCounter() throws IOException {
5588    byte[] fam = Bytes.toBytes("info");
5589    byte[][] families = { fam };
5590    this.region = initHRegion(tableName, method, CONF, families);
5591
5592    Assert.assertEquals(0L, region.getWriteRequestsCount());
5593
5594    Put put = new Put(row);
5595    put.addColumn(fam, fam, fam);
5596
5597    Assert.assertEquals(0L, region.getWriteRequestsCount());
5598    region.put(put);
5599    Assert.assertEquals(1L, region.getWriteRequestsCount());
5600    region.put(put);
5601    Assert.assertEquals(2L, region.getWriteRequestsCount());
5602    region.put(put);
5603    Assert.assertEquals(3L, region.getWriteRequestsCount());
5604
5605    region.delete(new Delete(row));
5606    Assert.assertEquals(4L, region.getWriteRequestsCount());
5607  }
5608
5609  @Test
5610  public void testOpenRegionWrittenToWAL() throws Exception {
5611    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
5612    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5613
5614    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5615    htd.addFamily(new HColumnDescriptor(fam1));
5616    htd.addFamily(new HColumnDescriptor(fam2));
5617
5618    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5619      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5620
5621    // open the region w/o rss and wal and flush some files
5622    region =
5623         HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5624             .getConfiguration(), htd);
5625    assertNotNull(region);
5626
5627    // create a file in fam1 for the region before opening in OpenRegionHandler
5628    region.put(new Put(Bytes.toBytes("a")).addColumn(fam1, fam1, fam1));
5629    region.flush(true);
5630    HBaseTestingUtility.closeRegionAndWAL(region);
5631
5632    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5633
5634    // capture append() calls
5635    WAL wal = mockWAL();
5636    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5637
5638    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5639      TEST_UTIL.getConfiguration(), rss, null);
5640
5641    verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any()
5642      , editCaptor.capture(), anyBoolean());
5643
5644    WALEdit edit = editCaptor.getValue();
5645    assertNotNull(edit);
5646    assertNotNull(edit.getCells());
5647    assertEquals(1, edit.getCells().size());
5648    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5649    assertNotNull(desc);
5650
5651    LOG.info("RegionEventDescriptor from WAL: " + desc);
5652
5653    assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
5654    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5655    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5656      hri.getEncodedNameAsBytes()));
5657    assertTrue(desc.getLogSequenceNumber() > 0);
5658    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5659    assertEquals(2, desc.getStoresCount());
5660
5661    StoreDescriptor store = desc.getStores(0);
5662    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5663    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5664    assertEquals(1, store.getStoreFileCount()); // 1store file
5665    assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
5666
5667    store = desc.getStores(1);
5668    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5669    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5670    assertEquals(0, store.getStoreFileCount()); // no store files
5671  }
5672
5673  // Helper for test testOpenRegionWrittenToWALForLogReplay
5674  static class HRegionWithSeqId extends HRegion {
5675    public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
5676        final Configuration confParam, final RegionInfo regionInfo,
5677        final TableDescriptor htd, final RegionServerServices rsServices) {
5678      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
5679    }
5680    @Override
5681    protected long getNextSequenceId(WAL wal) throws IOException {
5682      return 42;
5683    }
5684  }
5685
5686  @Test
5687  public void testFlushedFileWithNoTags() throws Exception {
5688    final TableName tableName = TableName.valueOf(name.getMethodName());
5689    HTableDescriptor htd = new HTableDescriptor(tableName);
5690    htd.addFamily(new HColumnDescriptor(fam1));
5691    HRegionInfo info = new HRegionInfo(tableName, null, null, false);
5692    Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
5693    region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
5694    Put put = new Put(Bytes.toBytes("a-b-0-0"));
5695    put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
5696    region.put(put);
5697    region.flush(true);
5698    HStore store = region.getStore(fam1);
5699    Collection<HStoreFile> storefiles = store.getStorefiles();
5700    for (HStoreFile sf : storefiles) {
5701      assertFalse("Tags should not be present "
5702          ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
5703    }
5704  }
5705
5706  /**
5707   * Utility method to setup a WAL mock.
5708   * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
5709   * @return a mock WAL
5710   * @throws IOException
5711   */
5712  private WAL mockWAL() throws IOException {
5713    WAL wal = mock(WAL.class);
5714    Mockito.when(wal.append((HRegionInfo)Mockito.any(),
5715        (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
5716      thenAnswer(new Answer<Long>() {
5717        @Override
5718        public Long answer(InvocationOnMock invocation) throws Throwable {
5719          WALKeyImpl key = invocation.getArgument(1);
5720          MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
5721          key.setWriteEntry(we);
5722          return 1L;
5723        }
5724
5725    });
5726    return wal;
5727  }
5728
5729  @Test
5730  public void testCloseRegionWrittenToWAL() throws Exception {
5731
5732    Path rootDir = new Path(dir + name.getMethodName());
5733    FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
5734
5735    final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
5736    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5737
5738    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5739    htd.addFamily(new HColumnDescriptor(fam1));
5740    htd.addFamily(new HColumnDescriptor(fam2));
5741
5742    final HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5743      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5744
5745    ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5746
5747    // capture append() calls
5748    WAL wal = mockWAL();
5749    when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
5750
5751
5752    // create and then open a region first so that it can be closed later
5753    region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
5754    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
5755      TEST_UTIL.getConfiguration(), rss, null);
5756
5757    // close the region
5758    region.close(false);
5759
5760    // 2 times, one for region open, the other close region
5761    verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(),
5762      editCaptor.capture(), anyBoolean());
5763
5764    WALEdit edit = editCaptor.getAllValues().get(1);
5765    assertNotNull(edit);
5766    assertNotNull(edit.getCells());
5767    assertEquals(1, edit.getCells().size());
5768    RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
5769    assertNotNull(desc);
5770
5771    LOG.info("RegionEventDescriptor from WAL: " + desc);
5772
5773    assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
5774    assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getTableName().toBytes()));
5775    assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
5776      hri.getEncodedNameAsBytes()));
5777    assertTrue(desc.getLogSequenceNumber() > 0);
5778    assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
5779    assertEquals(2, desc.getStoresCount());
5780
5781    StoreDescriptor store = desc.getStores(0);
5782    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
5783    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
5784    assertEquals(0, store.getStoreFileCount()); // no store files
5785
5786    store = desc.getStores(1);
5787    assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
5788    assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
5789    assertEquals(0, store.getStoreFileCount()); // no store files
5790  }
5791
5792  /**
5793   * Test RegionTooBusyException thrown when region is busy
5794   */
5795  @Test
5796  public void testRegionTooBusy() throws IOException {
5797    byte[] family = Bytes.toBytes("family");
5798    long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
5799      HRegion.DEFAULT_BUSY_WAIT_DURATION);
5800    CONF.setLong("hbase.busy.wait.duration", 1000);
5801    region = initHRegion(tableName, method, CONF, family);
5802    final AtomicBoolean stopped = new AtomicBoolean(true);
5803    Thread t = new Thread(new Runnable() {
5804      @Override
5805      public void run() {
5806        try {
5807          region.lock.writeLock().lock();
5808          stopped.set(false);
5809          while (!stopped.get()) {
5810            Thread.sleep(100);
5811          }
5812        } catch (InterruptedException ie) {
5813        } finally {
5814          region.lock.writeLock().unlock();
5815        }
5816      }
5817    });
5818    t.start();
5819    Get get = new Get(row);
5820    try {
5821      while (stopped.get()) {
5822        Thread.sleep(100);
5823      }
5824      region.get(get);
5825      fail("Should throw RegionTooBusyException");
5826    } catch (InterruptedException ie) {
5827      fail("test interrupted");
5828    } catch (RegionTooBusyException e) {
5829      // Good, expected
5830    } finally {
5831      stopped.set(true);
5832      try {
5833        t.join();
5834      } catch (Throwable e) {
5835      }
5836
5837      HBaseTestingUtility.closeRegionAndWAL(region);
5838      region = null;
5839      CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
5840    }
5841  }
5842
5843  @Test
5844  public void testCellTTLs() throws IOException {
5845    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
5846    EnvironmentEdgeManager.injectEdge(edge);
5847
5848    final byte[] row = Bytes.toBytes("testRow");
5849    final byte[] q1 = Bytes.toBytes("q1");
5850    final byte[] q2 = Bytes.toBytes("q2");
5851    final byte[] q3 = Bytes.toBytes("q3");
5852    final byte[] q4 = Bytes.toBytes("q4");
5853
5854    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
5855    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
5856    hcd.setTimeToLive(10); // 10 seconds
5857    htd.addFamily(hcd);
5858
5859    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
5860    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
5861
5862    region = HBaseTestingUtility.createRegionAndWAL(new HRegionInfo(htd.getTableName(),
5863            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
5864        TEST_UTIL.getDataTestDir(), conf, htd);
5865    assertNotNull(region);
5866    long now = EnvironmentEdgeManager.currentTime();
5867    // Add a cell that will expire in 5 seconds via cell TTL
5868    region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
5869      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5870        // TTL tags specify ts in milliseconds
5871        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5872    // Add a cell that will expire after 10 seconds via family setting
5873    region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
5874    // Add a cell that will expire in 15 seconds via cell TTL
5875    region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
5876      HConstants.EMPTY_BYTE_ARRAY, new ArrayBackedTag[] {
5877        // TTL tags specify ts in milliseconds
5878        new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) })));
5879    // Add a cell that will expire in 20 seconds via family setting
5880    region.put(new Put(row).addColumn(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
5881
5882    // Flush so we are sure store scanning gets this right
5883    region.flush(true);
5884
5885    // A query at time T+0 should return all cells
5886    Result r = region.get(new Get(row));
5887    assertNotNull(r.getValue(fam1, q1));
5888    assertNotNull(r.getValue(fam1, q2));
5889    assertNotNull(r.getValue(fam1, q3));
5890    assertNotNull(r.getValue(fam1, q4));
5891
5892    // Increment time to T+5 seconds
5893    edge.incrementTime(5000);
5894
5895    r = region.get(new Get(row));
5896    assertNull(r.getValue(fam1, q1));
5897    assertNotNull(r.getValue(fam1, q2));
5898    assertNotNull(r.getValue(fam1, q3));
5899    assertNotNull(r.getValue(fam1, q4));
5900
5901    // Increment time to T+10 seconds
5902    edge.incrementTime(5000);
5903
5904    r = region.get(new Get(row));
5905    assertNull(r.getValue(fam1, q1));
5906    assertNull(r.getValue(fam1, q2));
5907    assertNotNull(r.getValue(fam1, q3));
5908    assertNotNull(r.getValue(fam1, q4));
5909
5910    // Increment time to T+15 seconds
5911    edge.incrementTime(5000);
5912
5913    r = region.get(new Get(row));
5914    assertNull(r.getValue(fam1, q1));
5915    assertNull(r.getValue(fam1, q2));
5916    assertNull(r.getValue(fam1, q3));
5917    assertNotNull(r.getValue(fam1, q4));
5918
5919    // Increment time to T+20 seconds
5920    edge.incrementTime(10000);
5921
5922    r = region.get(new Get(row));
5923    assertNull(r.getValue(fam1, q1));
5924    assertNull(r.getValue(fam1, q2));
5925    assertNull(r.getValue(fam1, q3));
5926    assertNull(r.getValue(fam1, q4));
5927
5928    // Fun with disappearing increments
5929
5930    // Start at 1
5931    region.put(new Put(row).addColumn(fam1, q1, Bytes.toBytes(1L)));
5932    r = region.get(new Get(row));
5933    byte[] val = r.getValue(fam1, q1);
5934    assertNotNull(val);
5935    assertEquals(1L, Bytes.toLong(val));
5936
5937    // Increment with a TTL of 5 seconds
5938    Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
5939    incr.setTTL(5000);
5940    region.increment(incr); // 2
5941
5942    // New value should be 2
5943    r = region.get(new Get(row));
5944    val = r.getValue(fam1, q1);
5945    assertNotNull(val);
5946    assertEquals(2L, Bytes.toLong(val));
5947
5948    // Increment time to T+25 seconds
5949    edge.incrementTime(5000);
5950
5951    // Value should be back to 1
5952    r = region.get(new Get(row));
5953    val = r.getValue(fam1, q1);
5954    assertNotNull(val);
5955    assertEquals(1L, Bytes.toLong(val));
5956
5957    // Increment time to T+30 seconds
5958    edge.incrementTime(5000);
5959
5960    // Original value written at T+20 should be gone now via family TTL
5961    r = region.get(new Get(row));
5962    assertNull(r.getValue(fam1, q1));
5963  }
5964
5965  @Test
5966  public void testIncrementTimestampsAreMonotonic() throws IOException {
5967    region = initHRegion(tableName, method, CONF, fam1);
5968    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
5969    EnvironmentEdgeManager.injectEdge(edge);
5970
5971    edge.setValue(10);
5972    Increment inc = new Increment(row);
5973    inc.setDurability(Durability.SKIP_WAL);
5974    inc.addColumn(fam1, qual1, 1L);
5975    region.increment(inc);
5976
5977    Result result = region.get(new Get(row));
5978    Cell c = result.getColumnLatestCell(fam1, qual1);
5979    assertNotNull(c);
5980    assertEquals(10L, c.getTimestamp());
5981
5982    edge.setValue(1); // clock goes back
5983    region.increment(inc);
5984    result = region.get(new Get(row));
5985    c = result.getColumnLatestCell(fam1, qual1);
5986    assertEquals(11L, c.getTimestamp());
5987    assertEquals(2L, Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
5988  }
5989
5990  @Test
5991  public void testAppendTimestampsAreMonotonic() throws IOException {
5992    region = initHRegion(tableName, method, CONF, fam1);
5993    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
5994    EnvironmentEdgeManager.injectEdge(edge);
5995
5996    edge.setValue(10);
5997    Append a = new Append(row);
5998    a.setDurability(Durability.SKIP_WAL);
5999    a.addColumn(fam1, qual1, qual1);
6000    region.append(a);
6001
6002    Result result = region.get(new Get(row));
6003    Cell c = result.getColumnLatestCell(fam1, qual1);
6004    assertNotNull(c);
6005    assertEquals(10L, c.getTimestamp());
6006
6007    edge.setValue(1); // clock goes back
6008    region.append(a);
6009    result = region.get(new Get(row));
6010    c = result.getColumnLatestCell(fam1, qual1);
6011    assertEquals(11L, c.getTimestamp());
6012
6013    byte[] expected = new byte[qual1.length*2];
6014    System.arraycopy(qual1, 0, expected, 0, qual1.length);
6015    System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6016
6017    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6018      expected, 0, expected.length));
6019  }
6020
6021  @Test
6022  public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6023    region = initHRegion(tableName, method, CONF, fam1);
6024    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6025    EnvironmentEdgeManager.injectEdge(edge);
6026
6027    edge.setValue(10);
6028    Put p = new Put(row);
6029    p.setDurability(Durability.SKIP_WAL);
6030    p.addColumn(fam1, qual1, qual1);
6031    region.put(p);
6032
6033    Result result = region.get(new Get(row));
6034    Cell c = result.getColumnLatestCell(fam1, qual1);
6035    assertNotNull(c);
6036    assertEquals(10L, c.getTimestamp());
6037
6038    edge.setValue(1); // clock goes back
6039    p = new Put(row);
6040    p.setDurability(Durability.SKIP_WAL);
6041    p.addColumn(fam1, qual1, qual2);
6042    region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p);
6043    result = region.get(new Get(row));
6044    c = result.getColumnLatestCell(fam1, qual1);
6045    assertEquals(10L, c.getTimestamp());
6046
6047    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6048      qual2, 0, qual2.length));
6049  }
6050
6051  @Test
6052  public void testBatchMutateWithWrongRegionException() throws Exception {
6053    final byte[] a = Bytes.toBytes("a");
6054    final byte[] b = Bytes.toBytes("b");
6055    final byte[] c = Bytes.toBytes("c"); // exclusive
6056
6057    int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6058    CONF.setInt("hbase.rowlock.wait.duration", 1000);
6059    region = initHRegion(tableName, a, c, method, CONF, false, fam1);
6060
6061    Mutation[] mutations = new Mutation[] {
6062        new Put(a)
6063            .add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6064              .setRow(a)
6065              .setFamily(fam1)
6066              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6067              .setType(Cell.Type.Put)
6068              .build()),
6069        // this is outside the region boundary
6070        new Put(c).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6071              .setRow(c)
6072              .setFamily(fam1)
6073              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6074              .setType(Type.Put)
6075              .build()),
6076        new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6077              .setRow(b)
6078              .setFamily(fam1)
6079              .setTimestamp(HConstants.LATEST_TIMESTAMP)
6080              .setType(Cell.Type.Put)
6081              .build())
6082    };
6083
6084    OperationStatus[] status = region.batchMutate(mutations);
6085    assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6086    assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, status[1].getOperationStatusCode());
6087    assertEquals(OperationStatusCode.SUCCESS, status[2].getOperationStatusCode());
6088
6089
6090    // test with a row lock held for a long time
6091    final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6092    ExecutorService exec = Executors.newFixedThreadPool(2);
6093    Future<Void> f1 = exec.submit(new Callable<Void>() {
6094      @Override
6095      public Void call() throws Exception {
6096        LOG.info("Acquiring row lock");
6097        RowLock rl = region.getRowLock(b);
6098        obtainedRowLock.countDown();
6099        LOG.info("Waiting for 5 seconds before releasing lock");
6100        Threads.sleep(5000);
6101        LOG.info("Releasing row lock");
6102        rl.release();
6103        return null;
6104      }
6105    });
6106    obtainedRowLock.await(30, TimeUnit.SECONDS);
6107
6108    Future<Void> f2 = exec.submit(new Callable<Void>() {
6109      @Override
6110      public Void call() throws Exception {
6111        Mutation[] mutations = new Mutation[] {
6112            new Put(a).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6113                .setRow(a)
6114                .setFamily(fam1)
6115                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6116                .setType(Cell.Type.Put)
6117                .build()),
6118            new Put(b).add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
6119                .setRow(b)
6120                .setFamily(fam1)
6121                .setTimestamp(HConstants.LATEST_TIMESTAMP)
6122                .setType(Cell.Type.Put)
6123                .build()),
6124        };
6125
6126        // this will wait for the row lock, and it will eventually succeed
6127        OperationStatus[] status = region.batchMutate(mutations);
6128        assertEquals(OperationStatusCode.SUCCESS, status[0].getOperationStatusCode());
6129        assertEquals(OperationStatusCode.SUCCESS, status[1].getOperationStatusCode());
6130        return null;
6131      }
6132    });
6133
6134    f1.get();
6135    f2.get();
6136
6137    CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6138  }
6139
6140  @Test
6141  public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6142    region = initHRegion(tableName, method, CONF, fam1);
6143    ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6144    EnvironmentEdgeManager.injectEdge(edge);
6145
6146    edge.setValue(10);
6147    Put p = new Put(row);
6148    p.setDurability(Durability.SKIP_WAL);
6149    p.addColumn(fam1, qual1, qual1);
6150    region.put(p);
6151
6152    Result result = region.get(new Get(row));
6153    Cell c = result.getColumnLatestCell(fam1, qual1);
6154    assertNotNull(c);
6155    assertEquals(10L, c.getTimestamp());
6156
6157    edge.setValue(1); // clock goes back
6158    p = new Put(row);
6159    p.setDurability(Durability.SKIP_WAL);
6160    p.addColumn(fam1, qual1, qual2);
6161    RowMutations rm = new RowMutations(row);
6162    rm.add(p);
6163    assertTrue(region.checkAndRowMutate(row, fam1, qual1, CompareOperator.EQUAL,
6164        new BinaryComparator(qual1), rm));
6165    result = region.get(new Get(row));
6166    c = result.getColumnLatestCell(fam1, qual1);
6167    assertEquals(10L, c.getTimestamp());
6168    LOG.info("c value " +
6169      Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
6170
6171    assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6172      qual2, 0, qual2.length));
6173  }
6174
6175  HRegion initHRegion(TableName tableName, String callingMethod,
6176      byte[]... families) throws IOException {
6177    return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6178        families);
6179  }
6180
6181  /**
6182   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6183   * @throws IOException if IO error occurred during test
6184   */
6185  @Test
6186  public void testWritesWhileRollWriter() throws IOException {
6187    int testCount = 10;
6188    int numRows = 1024;
6189    int numFamilies = 2;
6190    int numQualifiers = 2;
6191    final byte[][] families = new byte[numFamilies][];
6192    for (int i = 0; i < numFamilies; i++) {
6193      families[i] = Bytes.toBytes("family" + i);
6194    }
6195    final byte[][] qualifiers = new byte[numQualifiers][];
6196    for (int i = 0; i < numQualifiers; i++) {
6197      qualifiers[i] = Bytes.toBytes("qual" + i);
6198    }
6199
6200    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6201    this.region = initHRegion(tableName, method, CONF, families);
6202    try {
6203      List<Thread> threads = new ArrayList<>();
6204      for (int i = 0; i < numRows; i++) {
6205        final int count = i;
6206        Thread t = new Thread(new Runnable() {
6207
6208          @Override
6209          public void run() {
6210            byte[] row = Bytes.toBytes("row" + count);
6211            Put put = new Put(row);
6212            put.setDurability(Durability.SYNC_WAL);
6213            byte[] value = Bytes.toBytes(String.valueOf(count));
6214            for (byte[] family : families) {
6215              for (byte[] qualifier : qualifiers) {
6216                put.addColumn(family, qualifier, count, value);
6217              }
6218            }
6219            try {
6220              region.put(put);
6221            } catch (IOException e) {
6222              throw new RuntimeException(e);
6223            }
6224          }
6225        });
6226        threads.add(t);
6227      }
6228      for (Thread t : threads) {
6229        t.start();
6230      }
6231
6232      for (int i = 0; i < testCount; i++) {
6233        region.getWAL().rollWriter();
6234        Thread.yield();
6235      }
6236    } finally {
6237      try {
6238        HBaseTestingUtility.closeRegionAndWAL(this.region);
6239        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6240      } catch (DroppedSnapshotException dse) {
6241        // We could get this on way out because we interrupt the background flusher and it could
6242        // fail anywhere causing a DSE over in the background flusher... only it is not properly
6243        // dealt with so could still be memory hanging out when we get to here -- memory we can't
6244        // flush because the accounting is 'off' since original DSE.
6245      }
6246      this.region = null;
6247    }
6248  }
6249
6250  @Test
6251  public void testMutateRow_WriteRequestCount() throws Exception {
6252    byte[] row1 = Bytes.toBytes("row1");
6253    byte[] fam1 = Bytes.toBytes("fam1");
6254    byte[] qf1 = Bytes.toBytes("qualifier");
6255    byte[] val1 = Bytes.toBytes("value1");
6256
6257    RowMutations rm = new RowMutations(row1);
6258    Put put = new Put(row1);
6259    put.addColumn(fam1, qf1, val1);
6260    rm.add(put);
6261
6262    this.region = initHRegion(tableName, method, CONF, fam1);
6263    long wrcBeforeMutate = this.region.writeRequestsCount.longValue();
6264    this.region.mutateRow(rm);
6265    long wrcAfterMutate = this.region.writeRequestsCount.longValue();
6266    Assert.assertEquals(wrcBeforeMutate + rm.getMutations().size(), wrcAfterMutate);
6267  }
6268
6269  @Test
6270  public void testBulkLoadReplicationEnabled() throws IOException {
6271    TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
6272    final ServerName serverName = ServerName.valueOf(name.getMethodName(), 100, 42);
6273    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6274
6275    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
6276    htd.addFamily(new HColumnDescriptor(fam1));
6277    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6278        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6279    region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(),
6280        rss, null);
6281
6282    assertTrue(region.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false));
6283    String plugins = region.conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
6284    String replicationCoprocessorClass = ReplicationObserver.class.getCanonicalName();
6285    assertTrue(plugins.contains(replicationCoprocessorClass));
6286    assertTrue(region.getCoprocessorHost().
6287        getCoprocessors().contains(ReplicationObserver.class.getSimpleName()));
6288  }
6289
6290  // make sure region is success close when coprocessor wrong region open failed
6291  @Test
6292  public void testOpenRegionFailedMemoryLeak() throws Exception {
6293    final ServerName serverName = ServerName.valueOf("testOpenRegionFailed", 100, 42);
6294    final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6295
6296    HTableDescriptor htd
6297      = new HTableDescriptor(TableName.valueOf("testOpenRegionFailed"));
6298    htd.addFamily(new HColumnDescriptor(fam1));
6299    htd.setValue("COPROCESSOR$1", "hdfs://test/test.jar|test||");
6300
6301    HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6302      HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6303    ScheduledExecutorService executor = CompatibilitySingletonFactory.
6304      getInstance(MetricsExecutor.class).getExecutor();
6305    for (int i = 0; i < 20 ; i++) {
6306      try {
6307        HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6308          TEST_UTIL.getConfiguration(), rss, null);
6309      }catch(Throwable t){
6310        LOG.info("Expected exception, continue");
6311      }
6312    }
6313    TimeUnit.SECONDS.sleep(MetricsRegionWrapperImpl.PERIOD);
6314    Field[] fields = ThreadPoolExecutor.class.getDeclaredFields();
6315    boolean found = false;
6316    for(Field field : fields){
6317      if(field.getName().equals("workQueue")){
6318        field.setAccessible(true);
6319        BlockingQueue<Runnable> workQueue = (BlockingQueue<Runnable>)field.get(executor);
6320        //there are still two task not cancel, can not cause to memory lack
6321        Assert.assertTrue("ScheduledExecutor#workQueue should equals 2, now is " +
6322          workQueue.size() + ", please check region is close", 2 == workQueue.size());
6323        found = true;
6324      }
6325    }
6326    Assert.assertTrue("can not find workQueue, test failed", found);
6327  }
6328
6329  /**
6330   * The same as HRegion class, the only difference is that instantiateHStore will
6331   * create a different HStore - HStoreForTesting. [HBASE-8518]
6332   */
6333  public static class HRegionForTesting extends HRegion {
6334
6335    public HRegionForTesting(final Path tableDir, final WAL wal, final FileSystem fs,
6336                             final Configuration confParam, final RegionInfo regionInfo,
6337                             final TableDescriptor htd, final RegionServerServices rsServices) {
6338      this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
6339          wal, confParam, htd, rsServices);
6340    }
6341
6342    public HRegionForTesting(HRegionFileSystem fs, WAL wal,
6343                             Configuration confParam, TableDescriptor htd,
6344                             RegionServerServices rsServices) {
6345      super(fs, wal, confParam, htd, rsServices);
6346    }
6347
6348    /**
6349     * Create HStore instance.
6350     * @return If Mob is enabled, return HMobStore, otherwise return HStoreForTesting.
6351     */
6352    @Override
6353    protected HStore instantiateHStore(final ColumnFamilyDescriptor family, boolean warmup)
6354        throws IOException {
6355      if (family.isMobEnabled()) {
6356        if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
6357          throw new IOException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS +
6358              " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY +
6359              " accordingly.");
6360        }
6361        return new HMobStore(this, family, this.conf, warmup);
6362      }
6363      return new HStoreForTesting(this, family, this.conf, warmup);
6364    }
6365  }
6366
6367  /**
6368   * HStoreForTesting is merely the same as HStore, the difference is in the doCompaction method
6369   * of HStoreForTesting there is a checkpoint "hbase.hstore.compaction.complete" which
6370   * doesn't let hstore compaction complete. In the former edition, this config is set in
6371   * HStore class inside compact method, though this is just for testing, otherwise it
6372   * doesn't do any help. In HBASE-8518, we try to get rid of all "hbase.hstore.compaction.complete"
6373   * config (except for testing code).
6374   */
6375  public static class HStoreForTesting extends HStore {
6376
6377    protected HStoreForTesting(final HRegion region,
6378        final ColumnFamilyDescriptor family,
6379        final Configuration confParam, boolean warmup) throws IOException {
6380      super(region, family, confParam, warmup);
6381    }
6382
6383    @Override
6384    protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
6385        Collection<HStoreFile> filesToCompact, User user, long compactionStartTime,
6386        List<Path> newFiles) throws IOException {
6387      // let compaction incomplete.
6388      if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
6389        LOG.warn("hbase.hstore.compaction.complete is set to false");
6390        List<HStoreFile> sfs = new ArrayList<>(newFiles.size());
6391        final boolean evictOnClose =
6392            cacheConf != null? cacheConf.shouldEvictOnClose(): true;
6393        for (Path newFile : newFiles) {
6394          // Create storefile around what we wrote with a reader on it.
6395          HStoreFile sf = createStoreFileAndReader(newFile);
6396          sf.closeStoreFile(evictOnClose);
6397          sfs.add(sf);
6398        }
6399        return sfs;
6400      }
6401      return super.doCompaction(cr, filesToCompact, user, compactionStartTime, newFiles);
6402    }
6403  }
6404}