001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.Coprocessor;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HColumnDescriptor;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
058import org.apache.hadoop.hbase.regionserver.HRegion;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
061import org.apache.hadoop.hbase.regionserver.RegionScanner;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.Pair;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.Assert;
069import org.junit.Before;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
081
082@Category({LargeTests.class, ClientTests.class})
083public class TestFromClientSide3 {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestFromClientSide3.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
090  private final static HBaseTestingUtility TEST_UTIL
091    = new HBaseTestingUtility();
092  private static final int WAITTABLE_MILLIS = 10000;
093  private static byte[] FAMILY = Bytes.toBytes("testFamily");
094  private static Random random = new Random();
095  private static int SLAVES = 3;
096  private static final byte[] ROW = Bytes.toBytes("testRow");
097  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
098  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
099  private static final byte[] VALUE = Bytes.toBytes("testValue");
100  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
101  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
102  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
103
104  @Rule
105  public TestName name = new TestName();
106  private TableName tableName;
107
108  /**
109   * @throws java.lang.Exception
110   */
111  @BeforeClass
112  public static void setUpBeforeClass() throws Exception {
113    TEST_UTIL.startMiniCluster(SLAVES);
114  }
115
116  /**
117   * @throws java.lang.Exception
118   */
119  @AfterClass
120  public static void tearDownAfterClass() throws Exception {
121    TEST_UTIL.shutdownMiniCluster();
122  }
123
124  /**
125   * @throws java.lang.Exception
126   */
127  @Before
128  public void setUp() throws Exception {
129    tableName = TableName.valueOf(name.getMethodName());
130  }
131
132  /**
133   * @throws java.lang.Exception
134   */
135  @After
136  public void tearDown() throws Exception {
137    for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) {
138      LOG.info("Tear down, remove table=" + htd.getTableName());
139      TEST_UTIL.deleteTable(htd.getTableName());
140  }
141  }
142
143  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
144      throws Exception {
145    Put put = new Put(row);
146    for (int i = 0; i < nPuts; i++) {
147      byte[] qualifier = Bytes.toBytes(random.nextInt());
148      byte[] value = Bytes.toBytes(random.nextInt());
149      put.addColumn(family, qualifier, value);
150    }
151    table.put(put);
152  }
153
154  private void performMultiplePutAndFlush(HBaseAdmin admin, Table table,
155      byte[] row, byte[] family, int nFlushes, int nPuts)
156  throws Exception {
157
158    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) {
159      // connection needed for poll-wait
160      HRegionLocation loc = locator.getRegionLocation(row, true);
161      AdminProtos.AdminService.BlockingInterface server =
162        ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
163      byte[] regName = loc.getRegionInfo().getRegionName();
164
165      for (int i = 0; i < nFlushes; i++) {
166        randomCFPuts(table, row, family, nPuts);
167        List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY);
168        int sfCount = sf.size();
169
170        admin.flush(table.getName());
171      }
172    }
173  }
174
175  private static List<Cell> toList(ResultScanner scanner) {
176    try {
177      List<Cell> cells = new ArrayList<>();
178      for (Result r : scanner) {
179        cells.addAll(r.listCells());
180      }
181      return cells;
182    } finally {
183      scanner.close();
184    }
185  }
186
187  @Test
188  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
189    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
190      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
191      byte[] row = Bytes.toBytes("SpecifiedRow");
192      byte[] value0 = Bytes.toBytes("value_0");
193      byte[] value1 = Bytes.toBytes("value_1");
194      Put put = new Put(row);
195      put.addColumn(FAMILY, QUALIFIER, VALUE);
196      table.put(put);
197      Delete d = new Delete(row);
198      table.delete(d);
199      put = new Put(row);
200      put.addColumn(FAMILY, null, value0);
201      table.put(put);
202      put = new Put(row);
203      put.addColumn(FAMILY, null, value1);
204      table.put(put);
205      List<Cell> cells = toList(table.getScanner(new Scan()));
206      assertEquals(1, cells.size());
207      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
208
209      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
210      assertEquals(1, cells.size());
211      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
212
213      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
214      assertEquals(0, cells.size());
215
216      TEST_UTIL.getAdmin().flush(tableName);
217      cells = toList(table.getScanner(new Scan()));
218      assertEquals(1, cells.size());
219      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
220
221      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
222      assertEquals(1, cells.size());
223      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
224
225      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
226      assertEquals(0, cells.size());
227    }
228  }
229
230  @Test
231  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
232    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
233      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
234      byte[] row = Bytes.toBytes("SpecifiedRow");
235      byte[] qual0 = Bytes.toBytes("qual0");
236      byte[] qual1 = Bytes.toBytes("qual1");
237      Delete d = new Delete(row);
238      table.delete(d);
239
240      Put put = new Put(row);
241      put.addColumn(FAMILY, null, VALUE);
242      table.put(put);
243
244      put = new Put(row);
245      put.addColumn(FAMILY, qual1, qual1);
246      table.put(put);
247
248      put = new Put(row);
249      put.addColumn(FAMILY, qual0, qual0);
250      table.put(put);
251
252      Result r = table.get(new Get(row));
253      assertEquals(3, r.size());
254      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
255      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
256      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
257
258      TEST_UTIL.getAdmin().flush(tableName);
259      r = table.get(new Get(row));
260      assertEquals(3, r.size());
261      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
262      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
263      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
264    }
265  }
266
267  // override the config settings at the CF level and ensure priority
268  @Test
269  public void testAdvancedConfigOverride() throws Exception {
270    /*
271     * Overall idea: (1) create 3 store files and issue a compaction. config's
272     * compaction.min == 3, so should work. (2) Increase the compaction.min
273     * toggle in the HTD to 5 and modify table. If we use the HTD value instead
274     * of the default config value, adding 3 files and issuing a compaction
275     * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
276     * and modify table. The CF schema should override the Table schema and now
277     * cause a minor compaction.
278     */
279    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
280
281    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
282      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
283      try (Admin admin = TEST_UTIL.getAdmin()) {
284        ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
285
286        // Create 3 store files.
287        byte[] row = Bytes.toBytes(random.nextInt());
288        performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100);
289
290        try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
291          // Verify we have multiple store files.
292          HRegionLocation loc = locator.getRegionLocation(row, true);
293          byte[] regionName = loc.getRegionInfo().getRegionName();
294          AdminProtos.AdminService.BlockingInterface server =
295                  connection.getAdmin(loc.getServerName());
296          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1);
297
298          // Issue a compaction request
299          admin.compact(tableName);
300
301          // poll wait for the compactions to happen
302          for (int i = 0; i < 10 * 1000 / 40; ++i) {
303            // The number of store files after compaction should be lesser.
304            loc = locator.getRegionLocation(row, true);
305            if (!loc.getRegionInfo().isOffline()) {
306              regionName = loc.getRegionInfo().getRegionName();
307              server = connection.getAdmin(loc.getServerName());
308              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) {
309                break;
310              }
311            }
312            Thread.sleep(40);
313          }
314          // verify the compactions took place and that we didn't just time out
315          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1);
316
317          // change the compaction.min config option for this table to 5
318          LOG.info("hbase.hstore.compaction.min should now be 5");
319          HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor());
320          htd.setValue("hbase.hstore.compaction.min", String.valueOf(5));
321          admin.modifyTable(tableName, htd);
322          Pair<Integer, Integer> st = admin.getAlterStatus(tableName);
323          while (null != st && st.getFirst() > 0) {
324            LOG.debug(st.getFirst() + " regions left to update");
325            Thread.sleep(40);
326            st = admin.getAlterStatus(tableName);
327          }
328          LOG.info("alter status finished");
329
330          // Create 3 more store files.
331          performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10);
332
333          // Issue a compaction request
334          admin.compact(tableName);
335
336          // This time, the compaction request should not happen
337          Thread.sleep(10 * 1000);
338          loc = locator.getRegionLocation(row, true);
339          regionName = loc.getRegionInfo().getRegionName();
340          server = connection.getAdmin(loc.getServerName());
341          int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size();
342          assertTrue(sfCount > 1);
343
344          // change an individual CF's config option to 2 & online schema update
345          LOG.info("hbase.hstore.compaction.min should now be 2");
346          HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
347          hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2));
348          htd.modifyFamily(hcd);
349          admin.modifyTable(tableName, htd);
350          st = admin.getAlterStatus(tableName);
351          while (null != st && st.getFirst() > 0) {
352            LOG.debug(st.getFirst() + " regions left to update");
353            Thread.sleep(40);
354            st = admin.getAlterStatus(tableName);
355          }
356          LOG.info("alter status finished");
357
358          // Issue a compaction request
359          admin.compact(tableName);
360
361          // poll wait for the compactions to happen
362          for (int i = 0; i < 10 * 1000 / 40; ++i) {
363            loc = locator.getRegionLocation(row, true);
364            regionName = loc.getRegionInfo().getRegionName();
365            try {
366              server = connection.getAdmin(loc.getServerName());
367              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) {
368                break;
369              }
370            } catch (Exception e) {
371              LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName));
372            }
373            Thread.sleep(40);
374          }
375
376          // verify the compaction took place and that we didn't just time out
377          assertTrue(ProtobufUtil.getStoreFiles(
378                  server, regionName, FAMILY).size() < sfCount);
379
380          // Finally, ensure that we can remove a custom config value after we made it
381          LOG.info("Removing CF config value");
382          LOG.info("hbase.hstore.compaction.min should now be 5");
383          hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
384          hcd.setValue("hbase.hstore.compaction.min", null);
385          htd.modifyFamily(hcd);
386          admin.modifyTable(tableName, htd);
387          st = admin.getAlterStatus(tableName);
388          while (null != st && st.getFirst() > 0) {
389            LOG.debug(st.getFirst() + " regions left to update");
390            Thread.sleep(40);
391            st = admin.getAlterStatus(tableName);
392          }
393          LOG.info("alter status finished");
394          assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue(
395                  "hbase.hstore.compaction.min"));
396        }
397      }
398    }
399  }
400
401  @Test
402  public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
403    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
404      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
405      List actions = (List) new ArrayList();
406      Object[] results = new Object[2];
407      // create an empty Put
408      Put put1 = new Put(ROW);
409      actions.add(put1);
410
411      Put put2 = new Put(ANOTHERROW);
412      put2.addColumn(FAMILY, QUALIFIER, VALUE);
413      actions.add(put2);
414
415      table.batch(actions, results);
416      fail("Empty Put should have failed the batch call");
417    } catch (IllegalArgumentException iae) {
418    }
419  }
420
421  // Test Table.batch with large amount of mutations against the same key.
422  // It used to trigger read lock's "Maximum lock count exceeded" Error.
423  @Test
424  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
425    int sixtyFourK = 64 * 1024;
426    List actions = new ArrayList();
427    Object[] results = new Object[(sixtyFourK + 1) * 2];
428
429    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
430      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
431
432      for (int i = 0; i < sixtyFourK + 1; i++) {
433        Put put1 = new Put(ROW);
434        put1.addColumn(FAMILY, QUALIFIER, VALUE);
435        actions.add(put1);
436
437        Put put2 = new Put(ANOTHERROW);
438        put2.addColumn(FAMILY, QUALIFIER, VALUE);
439        actions.add(put2);
440      }
441
442      table.batch(actions, results);
443    }
444  }
445
446  @Test
447  public void testBatchWithRowMutation() throws Exception {
448    LOG.info("Starting testBatchWithRowMutation");
449    byte [][] QUALIFIERS = new byte [][] {
450      Bytes.toBytes("a"), Bytes.toBytes("b")
451    };
452
453    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
454      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
455
456      RowMutations arm = RowMutations.of(Collections.singletonList(
457              new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
458      Object[] batchResult = new Object[1];
459      table.batch(Arrays.asList(arm), batchResult);
460
461      Get g = new Get(ROW);
462      Result r = table.get(g);
463      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
464
465      arm = RowMutations.of(Arrays.asList(
466              new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
467              new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
468      table.batch(Arrays.asList(arm), batchResult);
469      r = table.get(g);
470      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
471      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
472
473      // Test that we get the correct remote exception for RowMutations from batch()
474      try {
475        arm = RowMutations.of(Collections.singletonList(
476                new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
477        table.batch(Arrays.asList(arm), batchResult);
478        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
479      } catch (RetriesExhaustedWithDetailsException e) {
480        String msg = e.getMessage();
481        assertTrue(msg.contains("NoSuchColumnFamilyException"));
482      }
483    }
484  }
485
486  @Test
487  public void testHTableExistsMethodSingleRegionSingleGet()
488          throws IOException, InterruptedException {
489    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
490      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
491
492      // Test with a single region table.
493      Put put = new Put(ROW);
494      put.addColumn(FAMILY, QUALIFIER, VALUE);
495
496      Get get = new Get(ROW);
497
498      boolean exist = table.exists(get);
499      assertFalse(exist);
500
501      table.put(put);
502
503      exist = table.exists(get);
504      assertTrue(exist);
505    }
506  }
507
508  @Test
509  public void testHTableExistsMethodSingleRegionMultipleGets()
510          throws IOException, InterruptedException {
511    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
512      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
513
514      Put put = new Put(ROW);
515      put.addColumn(FAMILY, QUALIFIER, VALUE);
516      table.put(put);
517
518      List<Get> gets = new ArrayList<>();
519      gets.add(new Get(ROW));
520      gets.add(new Get(ANOTHERROW));
521
522      boolean[] results = table.exists(gets);
523      assertTrue(results[0]);
524      assertFalse(results[1]);
525    }
526  }
527
528  @Test
529  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
530    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
531      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
532
533      Put put = new Put(ROW);
534      put.addColumn(FAMILY, QUALIFIER, VALUE);
535      table.put(put);
536
537      Get get = new Get(ROW);
538
539      boolean exist = table.exists(get);
540      assertEquals(true, exist);
541
542      Result result = table.get(get);
543      assertEquals(false, result.isEmpty());
544      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
545    }
546  }
547
548  @Test
549  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
550    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
551      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
552
553      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
554      Put put = new Put(ROW);
555      put.addColumn(FAMILY, QUALIFIER, VALUE);
556      table.put(put);
557      put = new Put(ROW2);
558      put.addColumn(FAMILY, QUALIFIER, VALUE);
559      table.put(put);
560
561      Get get = new Get(ROW);
562      Get get2 = new Get(ROW2);
563      ArrayList<Get> getList = new ArrayList(2);
564      getList.add(get);
565      getList.add(get2);
566
567      boolean[] exists = table.existsAll(getList);
568      assertEquals(true, exists[0]);
569      assertEquals(true, exists[1]);
570
571      Result[] result = table.get(getList);
572      assertEquals(false, result[0].isEmpty());
573      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
574      assertEquals(false, result[1].isEmpty());
575      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
576    }
577  }
578
579  @Test
580  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
581    try (Table table = TEST_UTIL.createTable(
582      tableName, new byte[][] { FAMILY },
583      1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
584      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
585
586      Put put = new Put(ROW);
587      put.addColumn(FAMILY, QUALIFIER, VALUE);
588
589      Get get = new Get(ROW);
590
591      boolean exist = table.exists(get);
592      assertFalse(exist);
593
594      table.put(put);
595
596      exist = table.exists(get);
597      assertTrue(exist);
598    }
599  }
600
601  @Test
602  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
603    try (Table table = TEST_UTIL.createTable(
604      tableName,
605      new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
606      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
607
608      Put put = new Put(ROW);
609      put.addColumn(FAMILY, QUALIFIER, VALUE);
610      table.put(put);
611
612      List<Get> gets = new ArrayList<>();
613      gets.add(new Get(ANOTHERROW));
614      gets.add(new Get(Bytes.add(ROW, new byte[]{0x00})));
615      gets.add(new Get(ROW));
616      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00})));
617
618      LOG.info("Calling exists");
619      boolean[] results = table.existsAll(gets);
620      assertFalse(results[0]);
621      assertFalse(results[1]);
622      assertTrue(results[2]);
623      assertFalse(results[3]);
624
625      // Test with the first region.
626      put = new Put(new byte[]{0x00});
627      put.addColumn(FAMILY, QUALIFIER, VALUE);
628      table.put(put);
629
630      gets = new ArrayList<>();
631      gets.add(new Get(new byte[]{0x00}));
632      gets.add(new Get(new byte[]{0x00, 0x00}));
633      results = table.existsAll(gets);
634      assertTrue(results[0]);
635      assertFalse(results[1]);
636
637      // Test with the last region
638      put = new Put(new byte[]{(byte) 0xff, (byte) 0xff});
639      put.addColumn(FAMILY, QUALIFIER, VALUE);
640      table.put(put);
641
642      gets = new ArrayList<>();
643      gets.add(new Get(new byte[]{(byte) 0xff}));
644      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
645      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
646      results = table.existsAll(gets);
647      assertFalse(results[0]);
648      assertTrue(results[1]);
649      assertFalse(results[2]);
650    }
651  }
652
653  @Test
654  public void testGetEmptyRow() throws Exception {
655    //Create a table and put in 1 row
656    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
657      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
658
659      Put put = new Put(ROW_BYTES);
660      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
661      table.put(put);
662
663      //Try getting the row with an empty row key
664      Result res = null;
665      try {
666        res = table.get(new Get(new byte[0]));
667        fail();
668      } catch (IllegalArgumentException e) {
669        // Expected.
670      }
671      assertTrue(res == null);
672      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
673      assertTrue(res.isEmpty() == true);
674      res = table.get(new Get(ROW_BYTES));
675      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
676    }
677  }
678
679  @Test
680  public void testConnectionDefaultUsesCodec() throws Exception {
681    ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
682    assertTrue(con.hasCellBlockSupport());
683  }
684
685  @Test
686  public void testPutWithPreBatchMutate() throws Exception {
687    testPreBatchMutate(tableName, () -> {
688      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
689        Put put = new Put(ROW);
690        put.addColumn(FAMILY, QUALIFIER, VALUE);
691        t.put(put);
692      } catch (IOException ex) {
693        throw new RuntimeException(ex);
694      }
695    });
696  }
697
698  @Test
699  public void testRowMutationsWithPreBatchMutate() throws Exception {
700    testPreBatchMutate(tableName, () -> {
701      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
702        RowMutations rm = new RowMutations(ROW, 1);
703        Put put = new Put(ROW);
704        put.addColumn(FAMILY, QUALIFIER, VALUE);
705        rm.add(put);
706        t.mutateRow(rm);
707      } catch (IOException ex) {
708        throw new RuntimeException(ex);
709      }
710    });
711  }
712
713  private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
714    HTableDescriptor desc = new HTableDescriptor(tableName);
715    desc.addCoprocessor(WaitingForScanObserver.class.getName());
716    desc.addFamily(new HColumnDescriptor(FAMILY));
717    TEST_UTIL.getAdmin().createTable(desc);
718    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
719
720    ExecutorService service = Executors.newFixedThreadPool(2);
721    service.execute(rn);
722    final List<Cell> cells = new ArrayList<>();
723    service.execute(() -> {
724      try {
725        // waiting for update.
726        TimeUnit.SECONDS.sleep(3);
727        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
728          Scan scan = new Scan();
729          try (ResultScanner scanner = t.getScanner(scan)) {
730            for (Result r : scanner) {
731              cells.addAll(Arrays.asList(r.rawCells()));
732            }
733          }
734        }
735      } catch (IOException | InterruptedException ex) {
736        throw new RuntimeException(ex);
737      }
738    });
739    service.shutdown();
740    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
741    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
742            + ", so the data is invisible to reader", 0, cells.size());
743    TEST_UTIL.deleteTable(tableName);
744  }
745
746  @Test
747  public void testLockLeakWithDelta() throws Exception, Throwable {
748    HTableDescriptor desc = new HTableDescriptor(tableName);
749    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
750    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
751    desc.addFamily(new HColumnDescriptor(FAMILY));
752    TEST_UTIL.getAdmin().createTable(desc);
753    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
754
755    // new a connection for lower retry number.
756    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
757    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
758    try (Connection con = ConnectionFactory.createConnection(copy)) {
759      HRegion region = (HRegion) find(tableName);
760      region.setTimeoutForWriteLock(10);
761      ExecutorService putService = Executors.newSingleThreadExecutor();
762      putService.execute(() -> {
763        try (Table table = con.getTable(tableName)) {
764          Put put = new Put(ROW);
765          put.addColumn(FAMILY, QUALIFIER, VALUE);
766          // the put will be blocked by WaitingForMultiMutationsObserver.
767          table.put(put);
768        } catch (IOException ex) {
769          throw new RuntimeException(ex);
770        }
771      });
772      ExecutorService appendService = Executors.newSingleThreadExecutor();
773      appendService.execute(() -> {
774        Append append = new Append(ROW);
775        append.addColumn(FAMILY, QUALIFIER, VALUE);
776        try (Table table = con.getTable(tableName)) {
777          table.append(append);
778          fail("The APPEND should fail because the target lock is blocked by previous put");
779        } catch (Exception ex) {
780        }
781      });
782      appendService.shutdown();
783      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
784      WaitingForMultiMutationsObserver observer =
785              find(tableName, WaitingForMultiMutationsObserver.class);
786      observer.latch.countDown();
787      putService.shutdown();
788      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
789      try (Table table = con.getTable(tableName)) {
790        Result r = table.get(new Get(ROW));
791        assertFalse(r.isEmpty());
792        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
793      }
794    }
795    HRegion region = (HRegion) find(tableName);
796    int readLockCount = region.getReadLockCount();
797    LOG.info("readLockCount:" + readLockCount);
798    assertEquals(0, readLockCount);
799  }
800
801  @Test
802  public void testMultiRowMutations() throws Exception, Throwable {
803    HTableDescriptor desc = new HTableDescriptor(tableName);
804    desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
805    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
806    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
807    desc.addFamily(new HColumnDescriptor(FAMILY));
808    TEST_UTIL.getAdmin().createTable(desc);
809    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
810
811    // new a connection for lower retry number.
812    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
813    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
814    try (Connection con = ConnectionFactory.createConnection(copy)) {
815      byte[] row = Bytes.toBytes("ROW-0");
816      byte[] rowLocked= Bytes.toBytes("ROW-1");
817      byte[] value0 = Bytes.toBytes("VALUE-0");
818      byte[] value1 = Bytes.toBytes("VALUE-1");
819      byte[] value2 = Bytes.toBytes("VALUE-2");
820      assertNoLocks(tableName);
821      ExecutorService putService = Executors.newSingleThreadExecutor();
822      putService.execute(() -> {
823        try (Table table = con.getTable(tableName)) {
824          Put put0 = new Put(rowLocked);
825          put0.addColumn(FAMILY, QUALIFIER, value0);
826          // the put will be blocked by WaitingForMultiMutationsObserver.
827          table.put(put0);
828        } catch (IOException ex) {
829          throw new RuntimeException(ex);
830        }
831      });
832      ExecutorService cpService = Executors.newSingleThreadExecutor();
833      cpService.execute(() -> {
834        boolean threw;
835        Put put1 = new Put(row);
836        Put put2 = new Put(rowLocked);
837        put1.addColumn(FAMILY, QUALIFIER, value1);
838        put2.addColumn(FAMILY, QUALIFIER, value2);
839        try (Table table = con.getTable(tableName)) {
840          MultiRowMutationProtos.MutateRowsRequest request
841            = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
842              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
843                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
844                              .MutationType.PUT, put1))
845              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
846                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
847                              .MutationType.PUT, put2))
848              .build();
849          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
850            ROW, ROW,
851            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
852              ServerRpcController controller = new ServerRpcController();
853              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
854                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
855              exe.mutateRows(controller, request, rpcCallback);
856              return rpcCallback.get();
857            });
858          threw = false;
859        } catch (Throwable ex) {
860          threw = true;
861        }
862        if (!threw) {
863          // Can't call fail() earlier because the catch would eat it.
864          fail("This cp should fail because the target lock is blocked by previous put");
865        }
866      });
867      cpService.shutdown();
868      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
869      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
870      observer.latch.countDown();
871      putService.shutdown();
872      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
873      try (Table table = con.getTable(tableName)) {
874        Get g0 = new Get(row);
875        Get g1 = new Get(rowLocked);
876        Result r0 = table.get(g0);
877        Result r1 = table.get(g1);
878        assertTrue(r0.isEmpty());
879        assertFalse(r1.isEmpty());
880        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
881      }
882      assertNoLocks(tableName);
883    }
884  }
885
886  /**
887   * A test case for issue HBASE-17482
888   * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
889   * onto cells in the append thread, a countdown latch is used to ensure that happened
890   * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
891   * make the seqid/mvcc acquirement in handler thread and stamping in append thread
892   * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
893   * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
894   * with a smaller readpoint can see these data, which disobey the multi version
895   * concurrency control rules.
896   * This test case is to reproduce this scenario.
897   * @throws IOException
898   */
899  @Test
900  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
901    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
902      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
903      //put two row first to init the scanner
904      Put put = new Put(Bytes.toBytes("0"));
905      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
906      table.put(put);
907      put = new Put(Bytes.toBytes("00"));
908      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
909      table.put(put);
910      Scan scan = new Scan();
911      scan.setTimeRange(0, Long.MAX_VALUE);
912      scan.setCaching(1);
913      ResultScanner scanner = table.getScanner(scan);
914      int rowNum = scanner.next() != null ? 1 : 0;
915      //the started scanner shouldn't see the rows put below
916      for (int i = 1; i < 1000; i++) {
917        put = new Put(Bytes.toBytes(String.valueOf(i)));
918        put.setDurability(Durability.ASYNC_WAL);
919        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
920        table.put(put);
921      }
922      for (Result result : scanner) {
923        rowNum++;
924      }
925      //scanner should only see two rows
926      assertEquals(2, rowNum);
927      scanner = table.getScanner(scan);
928      rowNum = 0;
929      for (Result result : scanner) {
930        rowNum++;
931      }
932      // the new scanner should see all rows
933      assertEquals(1001, rowNum);
934    }
935  }
936
937  @Test
938  public void testPutThenGetWithMultipleThreads() throws Exception {
939    final int THREAD_NUM = 20;
940    final int ROUND_NUM = 10;
941    for (int round = 0; round < ROUND_NUM; round++) {
942      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
943      final AtomicInteger successCnt = new AtomicInteger(0);
944      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
945        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
946
947        for (int i = 0; i < THREAD_NUM; i++) {
948          final int index = i;
949          Thread t = new Thread(new Runnable() {
950
951            @Override
952            public void run() {
953              final byte[] row = Bytes.toBytes("row-" + index);
954              final byte[] value = Bytes.toBytes("v" + index);
955              try {
956                Put put = new Put(row);
957                put.addColumn(FAMILY, QUALIFIER, value);
958                ht.put(put);
959                Get get = new Get(row);
960                Result result = ht.get(get);
961                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
962                if (Bytes.equals(value, returnedValue)) {
963                  successCnt.getAndIncrement();
964                } else {
965                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
966                          + ", returned value: "
967                          + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
968                }
969              } catch (Throwable e) {
970                // do nothing
971              }
972            }
973          });
974          threads.add(t);
975        }
976        for (Thread t : threads) {
977          t.start();
978        }
979        for (Thread t : threads) {
980          t.join();
981        }
982        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
983      }
984      TEST_UTIL.deleteTable(tableName);
985    }
986  }
987
988  private static void assertNoLocks(final TableName tableName)
989          throws IOException, InterruptedException {
990    HRegion region = (HRegion) find(tableName);
991    assertEquals(0, region.getLockedRows().size());
992  }
993  private static HRegion find(final TableName tableName)
994      throws IOException, InterruptedException {
995    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
996    List<HRegion> regions = rs.getRegions(tableName);
997    assertEquals(1, regions.size());
998    return regions.get(0);
999  }
1000
1001  private static <T extends RegionObserver> T find(final TableName tableName,
1002          Class<T> clz) throws IOException, InterruptedException {
1003    HRegion region = find(tableName);
1004    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1005    assertTrue("The cp instance should be " + clz.getName()
1006            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1007    return clz.cast(cp);
1008  }
1009
1010  public static class WaitingForMultiMutationsObserver
1011      implements RegionCoprocessor, RegionObserver {
1012    final CountDownLatch latch = new CountDownLatch(1);
1013
1014    @Override
1015    public Optional<RegionObserver> getRegionObserver() {
1016      return Optional.of(this);
1017    }
1018
1019    @Override
1020    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1021            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1022      try {
1023        latch.await();
1024      } catch (InterruptedException ex) {
1025        throw new IOException(ex);
1026      }
1027    }
1028  }
1029
1030  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1031    private final CountDownLatch latch = new CountDownLatch(1);
1032
1033    @Override
1034    public Optional<RegionObserver> getRegionObserver() {
1035      return Optional.of(this);
1036    }
1037
1038    @Override
1039    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1040            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1041      try {
1042        // waiting for scanner
1043        latch.await();
1044      } catch (InterruptedException ex) {
1045        throw new IOException(ex);
1046      }
1047    }
1048
1049    @Override
1050    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1051            final Scan scan, final RegionScanner s) throws IOException {
1052      latch.countDown();
1053      return s;
1054    }
1055  }
1056
1057  static byte[] generateHugeValue(int size) {
1058    Random rand = ThreadLocalRandom.current();
1059    byte[] value = new byte[size];
1060    for (int i = 0; i < value.length; i++) {
1061      value[i] = (byte) rand.nextInt(256);
1062    }
1063    return value;
1064  }
1065
1066  @Test
1067  public void testScanWithBatchSizeReturnIncompleteCells()
1068          throws IOException, InterruptedException {
1069    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1070      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1071      .build();
1072    try (Table table = TEST_UTIL.createTable(hd, null)) {
1073      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1074
1075      Put put = new Put(ROW);
1076      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1077      table.put(put);
1078
1079      put = new Put(ROW);
1080      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1081      table.put(put);
1082
1083      for (int i = 2; i < 5; i++) {
1084        for (int version = 0; version < 2; version++) {
1085          put = new Put(ROW);
1086          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1087          table.put(put);
1088        }
1089      }
1090
1091      Scan scan = new Scan();
1092      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1093              .setMaxResultSize(4 * 1024 * 1024);
1094      Result result;
1095      try (ResultScanner scanner = table.getScanner(scan)) {
1096        List<Result> list = new ArrayList<>();
1097        /*
1098         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB;
1099         * The second scan rpc should return a result with 3 cells, because reach the batch limit
1100         * = 3;
1101         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1102         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1103         * into two result: 2+3 -> 3+2;
1104         */
1105        while ((result = scanner.next()) != null) {
1106          list.add(result);
1107        }
1108
1109        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1110        Assert.assertEquals(2, list.size());
1111        Assert.assertEquals(3, list.get(0).size());
1112        Assert.assertEquals(2, list.get(1).size());
1113      }
1114
1115      scan = new Scan();
1116      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1117              .setMaxResultSize(4 * 1024 * 1024);
1118      try (ResultScanner scanner = table.getScanner(scan)) {
1119        List<Result> list = new ArrayList<>();
1120        while ((result = scanner.next()) != null) {
1121          list.add(result);
1122        }
1123        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1124        Assert.assertEquals(3, list.size());
1125        Assert.assertEquals(2, list.get(0).size());
1126        Assert.assertEquals(2, list.get(1).size());
1127        Assert.assertEquals(1, list.get(2).size());
1128      }
1129    }
1130  }
1131}