001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.Coprocessor;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HColumnDescriptor;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
058import org.apache.hadoop.hbase.regionserver.HRegion;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
061import org.apache.hadoop.hbase.regionserver.RegionScanner;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.Pair;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.Assert;
069import org.junit.Before;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
081
082@Category({LargeTests.class, ClientTests.class})
083public class TestFromClientSide3 {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestFromClientSide3.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
090  private final static HBaseTestingUtility TEST_UTIL
091    = new HBaseTestingUtility();
092  private static byte[] FAMILY = Bytes.toBytes("testFamily");
093  private static Random random = new Random();
094  private static int SLAVES = 3;
095  private static final byte[] ROW = Bytes.toBytes("testRow");
096  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
097  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
098  private static final byte[] VALUE = Bytes.toBytes("testValue");
099  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
100  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
101  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
102
103  @Rule
104  public TestName name = new TestName();
105
106  /**
107   * @throws java.lang.Exception
108   */
109  @BeforeClass
110  public static void setUpBeforeClass() throws Exception {
111    TEST_UTIL.startMiniCluster(SLAVES);
112  }
113
114  /**
115   * @throws java.lang.Exception
116   */
117  @AfterClass
118  public static void tearDownAfterClass() throws Exception {
119    TEST_UTIL.shutdownMiniCluster();
120  }
121
122  /**
123   * @throws java.lang.Exception
124   */
125  @Before
126  public void setUp() throws Exception {
127    // Nothing to do.
128  }
129
130  /**
131   * @throws java.lang.Exception
132   */
133  @After
134  public void tearDown() throws Exception {
135    for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) {
136      LOG.info("Tear down, remove table=" + htd.getTableName());
137      TEST_UTIL.deleteTable(htd.getTableName());
138  }
139  }
140
141  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
142      throws Exception {
143    Put put = new Put(row);
144    for (int i = 0; i < nPuts; i++) {
145      byte[] qualifier = Bytes.toBytes(random.nextInt());
146      byte[] value = Bytes.toBytes(random.nextInt());
147      put.addColumn(family, qualifier, value);
148    }
149    table.put(put);
150  }
151
152  private void performMultiplePutAndFlush(HBaseAdmin admin, Table table,
153      byte[] row, byte[] family, int nFlushes, int nPuts)
154  throws Exception {
155
156    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) {
157      // connection needed for poll-wait
158      HRegionLocation loc = locator.getRegionLocation(row, true);
159      AdminProtos.AdminService.BlockingInterface server =
160        ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
161      byte[] regName = loc.getRegionInfo().getRegionName();
162
163      for (int i = 0; i < nFlushes; i++) {
164        randomCFPuts(table, row, family, nPuts);
165        List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY);
166        int sfCount = sf.size();
167
168        admin.flush(table.getName());
169      }
170    }
171  }
172
173  private static List<Cell> toList(ResultScanner scanner) {
174    try {
175      List<Cell> cells = new ArrayList<>();
176      for (Result r : scanner) {
177        cells.addAll(r.listCells());
178      }
179      return cells;
180    } finally {
181      scanner.close();
182    }
183  }
184
185  @Test
186  public void testScanAfterDeletingSpecifiedRow() throws IOException {
187    TableName tableName = TableName.valueOf(name.getMethodName());
188    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
189            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
190            .build();
191    TEST_UTIL.getAdmin().createTable(desc);
192    byte[] row = Bytes.toBytes("SpecifiedRow");
193    byte[] value0 = Bytes.toBytes("value_0");
194    byte[] value1 = Bytes.toBytes("value_1");
195    try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
196      Put put = new Put(row);
197      put.addColumn(FAMILY, QUALIFIER, VALUE);
198      t.put(put);
199      Delete d = new Delete(row);
200      t.delete(d);
201      put = new Put(row);
202      put.addColumn(FAMILY, null, value0);
203      t.put(put);
204      put = new Put(row);
205      put.addColumn(FAMILY, null, value1);
206      t.put(put);
207      List<Cell> cells = toList(t.getScanner(new Scan()));
208      assertEquals(1, cells.size());
209      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
210
211      cells = toList(t.getScanner(new Scan().addFamily(FAMILY)));
212      assertEquals(1, cells.size());
213      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
214
215      cells = toList(t.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
216      assertEquals(0, cells.size());
217
218      TEST_UTIL.getAdmin().flush(tableName);
219      cells = toList(t.getScanner(new Scan()));
220      assertEquals(1, cells.size());
221      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
222
223      cells = toList(t.getScanner(new Scan().addFamily(FAMILY)));
224      assertEquals(1, cells.size());
225      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
226
227      cells = toList(t.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
228      assertEquals(0, cells.size());
229    }
230  }
231
232  @Test
233  public void testScanAfterDeletingSpecifiedRowV2() throws IOException {
234    TableName tableName = TableName.valueOf(name.getMethodName());
235    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
236            .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
237            .build();
238    TEST_UTIL.getAdmin().createTable(desc);
239    byte[] row = Bytes.toBytes("SpecifiedRow");
240    byte[] qual0 = Bytes.toBytes("qual0");
241    byte[] qual1 = Bytes.toBytes("qual1");
242    try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
243      Delete d = new Delete(row);
244      t.delete(d);
245
246      Put put = new Put(row);
247      put.addColumn(FAMILY, null, VALUE);
248      t.put(put);
249
250      put = new Put(row);
251      put.addColumn(FAMILY, qual1, qual1);
252      t.put(put);
253
254      put = new Put(row);
255      put.addColumn(FAMILY, qual0, qual0);
256      t.put(put);
257
258      Result r = t.get(new Get(row));
259      assertEquals(3, r.size());
260      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
261      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
262      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
263
264      TEST_UTIL.getAdmin().flush(tableName);
265      r = t.get(new Get(row));
266      assertEquals(3, r.size());
267      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
268      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
269      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
270    }
271  }
272
273  // override the config settings at the CF level and ensure priority
274  @Test
275  public void testAdvancedConfigOverride() throws Exception {
276    /*
277     * Overall idea: (1) create 3 store files and issue a compaction. config's
278     * compaction.min == 3, so should work. (2) Increase the compaction.min
279     * toggle in the HTD to 5 and modify table. If we use the HTD value instead
280     * of the default config value, adding 3 files and issuing a compaction
281     * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
282     * and modify table. The CF schema should override the Table schema and now
283     * cause a minor compaction.
284     */
285    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
286
287    final TableName tableName = TableName.valueOf(name.getMethodName());
288    Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10);
289    Admin admin = TEST_UTIL.getAdmin();
290    ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
291
292    // Create 3 store files.
293    byte[] row = Bytes.toBytes(random.nextInt());
294    performMultiplePutAndFlush((HBaseAdmin) admin, hTable, row, FAMILY, 3, 100);
295
296    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
297      // Verify we have multiple store files.
298      HRegionLocation loc = locator.getRegionLocation(row, true);
299      byte[] regionName = loc.getRegionInfo().getRegionName();
300      AdminProtos.AdminService.BlockingInterface server = connection.getAdmin(loc.getServerName());
301      assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1);
302
303      // Issue a compaction request
304      admin.compact(tableName);
305
306      // poll wait for the compactions to happen
307      for (int i = 0; i < 10 * 1000 / 40; ++i) {
308        // The number of store files after compaction should be lesser.
309        loc = locator.getRegionLocation(row, true);
310        if (!loc.getRegionInfo().isOffline()) {
311          regionName = loc.getRegionInfo().getRegionName();
312          server = connection.getAdmin(loc.getServerName());
313          if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) {
314            break;
315          }
316        }
317        Thread.sleep(40);
318      }
319      // verify the compactions took place and that we didn't just time out
320      assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1);
321
322      // change the compaction.min config option for this table to 5
323      LOG.info("hbase.hstore.compaction.min should now be 5");
324      HTableDescriptor htd = new HTableDescriptor(hTable.getTableDescriptor());
325      htd.setValue("hbase.hstore.compaction.min", String.valueOf(5));
326      admin.modifyTable(tableName, htd);
327      Pair<Integer, Integer> st;
328      while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) {
329        LOG.debug(st.getFirst() + " regions left to update");
330        Thread.sleep(40);
331      }
332      LOG.info("alter status finished");
333
334      // Create 3 more store files.
335      performMultiplePutAndFlush((HBaseAdmin) admin, hTable, row, FAMILY, 3, 10);
336
337      // Issue a compaction request
338      admin.compact(tableName);
339
340      // This time, the compaction request should not happen
341      Thread.sleep(10 * 1000);
342      loc = locator.getRegionLocation(row, true);
343      regionName = loc.getRegionInfo().getRegionName();
344      server = connection.getAdmin(loc.getServerName());
345      int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size();
346      assertTrue(sfCount > 1);
347
348      // change an individual CF's config option to 2 & online schema update
349      LOG.info("hbase.hstore.compaction.min should now be 2");
350      HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
351      hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2));
352      htd.modifyFamily(hcd);
353      admin.modifyTable(tableName, htd);
354      while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) {
355        LOG.debug(st.getFirst() + " regions left to update");
356        Thread.sleep(40);
357      }
358      LOG.info("alter status finished");
359
360      // Issue a compaction request
361      admin.compact(tableName);
362
363      // poll wait for the compactions to happen
364      for (int i = 0; i < 10 * 1000 / 40; ++i) {
365        loc = locator.getRegionLocation(row, true);
366        regionName = loc.getRegionInfo().getRegionName();
367        try {
368          server = connection.getAdmin(loc.getServerName());
369          if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) {
370            break;
371          }
372        } catch (Exception e) {
373          LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName));
374        }
375        Thread.sleep(40);
376      }
377
378      // verify the compaction took place and that we didn't just time out
379      assertTrue(ProtobufUtil.getStoreFiles(
380        server, regionName, FAMILY).size() < sfCount);
381
382      // Finally, ensure that we can remove a custom config value after we made it
383      LOG.info("Removing CF config value");
384      LOG.info("hbase.hstore.compaction.min should now be 5");
385      hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
386      hcd.setValue("hbase.hstore.compaction.min", null);
387      htd.modifyFamily(hcd);
388      admin.modifyTable(tableName, htd);
389      while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) {
390        LOG.debug(st.getFirst() + " regions left to update");
391        Thread.sleep(40);
392      }
393      LOG.info("alter status finished");
394      assertNull(hTable.getTableDescriptor().getFamily(FAMILY).getValue(
395          "hbase.hstore.compaction.min"));
396    }
397  }
398
399  @Test
400  public void testHTableBatchWithEmptyPut ()throws Exception {
401      Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
402          new byte[][] { FAMILY });
403    try {
404      List actions = (List) new ArrayList();
405      Object[] results = new Object[2];
406      // create an empty Put
407      Put put1 = new Put(ROW);
408      actions.add(put1);
409
410      Put put2 = new Put(ANOTHERROW);
411      put2.addColumn(FAMILY, QUALIFIER, VALUE);
412      actions.add(put2);
413
414      table.batch(actions, results);
415      fail("Empty Put should have failed the batch call");
416    } catch (IllegalArgumentException iae) {
417
418    } finally {
419      table.close();
420    }
421  }
422
423  // Test Table.batch with large amount of mutations against the same key.
424  // It used to trigger read lock's "Maximum lock count exceeded" Error.
425  @Test
426  public void testHTableWithLargeBatch() throws Exception {
427    Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
428        new byte[][] { FAMILY });
429    int sixtyFourK = 64 * 1024;
430    try {
431      List actions = new ArrayList();
432      Object[] results = new Object[(sixtyFourK + 1) * 2];
433
434      for (int i = 0; i < sixtyFourK + 1; i ++) {
435        Put put1 = new Put(ROW);
436        put1.addColumn(FAMILY, QUALIFIER, VALUE);
437        actions.add(put1);
438
439        Put put2 = new Put(ANOTHERROW);
440        put2.addColumn(FAMILY, QUALIFIER, VALUE);
441        actions.add(put2);
442      }
443
444      table.batch(actions, results);
445    } finally {
446      table.close();
447    }
448  }
449
450  @Test
451  public void testBatchWithRowMutation() throws Exception {
452    LOG.info("Starting testBatchWithRowMutation");
453    final TableName TABLENAME = TableName.valueOf("testBatchWithRowMutation");
454    try (Table t = TEST_UTIL.createTable(TABLENAME, FAMILY)) {
455      byte [][] QUALIFIERS = new byte [][] {
456        Bytes.toBytes("a"), Bytes.toBytes("b")
457      };
458
459      RowMutations arm = RowMutations.of(Collections.singletonList(
460        new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
461      Object[] batchResult = new Object[1];
462      t.batch(Arrays.asList(arm), batchResult);
463
464      Get g = new Get(ROW);
465      Result r = t.get(g);
466      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
467
468      arm = RowMutations.of(Arrays.asList(
469        new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
470        new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
471      t.batch(Arrays.asList(arm), batchResult);
472      r = t.get(g);
473      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
474      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
475
476      // Test that we get the correct remote exception for RowMutations from batch()
477      try {
478        arm = RowMutations.of(Collections.singletonList(
479          new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
480        t.batch(Arrays.asList(arm), batchResult);
481        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
482      } catch(RetriesExhaustedWithDetailsException e) {
483        String msg = e.getMessage();
484        assertTrue(msg.contains("NoSuchColumnFamilyException"));
485      }
486    }
487  }
488
489  @Test
490  public void testHTableExistsMethodSingleRegionSingleGet() throws Exception {
491      // Test with a single region table.
492      Table table = TEST_UTIL.createTable(
493          TableName.valueOf(name.getMethodName()),
494          new byte[][] { FAMILY });
495
496    Put put = new Put(ROW);
497    put.addColumn(FAMILY, QUALIFIER, VALUE);
498
499    Get get = new Get(ROW);
500
501    boolean exist = table.exists(get);
502    assertFalse(exist);
503
504    table.put(put);
505
506    exist = table.exists(get);
507    assertTrue(exist);
508  }
509
510  @Test
511  public void testHTableExistsMethodSingleRegionMultipleGets() throws Exception {
512    Table table = TEST_UTIL.createTable(TableName.valueOf(
513        name.getMethodName()), new byte[][] { FAMILY });
514
515    Put put = new Put(ROW);
516    put.addColumn(FAMILY, QUALIFIER, VALUE);
517    table.put(put);
518
519    List<Get> gets = new ArrayList<>();
520    gets.add(new Get(ROW));
521    gets.add(new Get(ANOTHERROW));
522
523    boolean[] results = table.exists(gets);
524    assertTrue(results[0]);
525    assertFalse(results[1]);
526  }
527
528  @Test
529  public void testHTableExistsBeforeGet() throws Exception {
530    Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()),
531        new byte[][] { FAMILY });
532    try {
533      Put put = new Put(ROW);
534      put.addColumn(FAMILY, QUALIFIER, VALUE);
535      table.put(put);
536
537      Get get = new Get(ROW);
538
539      boolean exist = table.exists(get);
540      assertEquals(true, exist);
541
542      Result result = table.get(get);
543      assertEquals(false, result.isEmpty());
544      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
545    } finally {
546      table.close();
547    }
548  }
549
550  @Test
551  public void testHTableExistsAllBeforeGet() throws Exception {
552    final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
553    Table table = TEST_UTIL.createTable(
554        TableName.valueOf(name.getMethodName()), new byte[][] { FAMILY });
555    try {
556      Put put = new Put(ROW);
557      put.addColumn(FAMILY, QUALIFIER, VALUE);
558      table.put(put);
559      put = new Put(ROW2);
560      put.addColumn(FAMILY, QUALIFIER, VALUE);
561      table.put(put);
562
563      Get get = new Get(ROW);
564      Get get2 = new Get(ROW2);
565      ArrayList<Get> getList = new ArrayList(2);
566      getList.add(get);
567      getList.add(get2);
568
569      boolean[] exists = table.existsAll(getList);
570      assertEquals(true, exists[0]);
571      assertEquals(true, exists[1]);
572
573      Result[] result = table.get(getList);
574      assertEquals(false, result[0].isEmpty());
575      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
576      assertEquals(false, result[1].isEmpty());
577      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
578    } finally {
579      table.close();
580    }
581  }
582
583  @Test
584  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
585    Table table = TEST_UTIL.createTable(
586      TableName.valueOf(name.getMethodName()), new byte[][] { FAMILY },
587      1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255);
588    Put put = new Put(ROW);
589    put.addColumn(FAMILY, QUALIFIER, VALUE);
590
591    Get get = new Get(ROW);
592
593    boolean exist = table.exists(get);
594    assertFalse(exist);
595
596    table.put(put);
597
598    exist = table.exists(get);
599    assertTrue(exist);
600  }
601
602  @Test
603  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
604    Table table = TEST_UTIL.createTable(
605      TableName.valueOf(name.getMethodName()),
606      new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255);
607    Put put = new Put(ROW);
608    put.addColumn(FAMILY, QUALIFIER, VALUE);
609    table.put (put);
610
611    List<Get> gets = new ArrayList<>();
612    gets.add(new Get(ANOTHERROW));
613    gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 })));
614    gets.add(new Get(ROW));
615    gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
616
617    LOG.info("Calling exists");
618    boolean[] results = table.existsAll(gets);
619    assertFalse(results[0]);
620    assertFalse(results[1]);
621    assertTrue(results[2]);
622    assertFalse(results[3]);
623
624    // Test with the first region.
625    put = new Put(new byte[] { 0x00 });
626    put.addColumn(FAMILY, QUALIFIER, VALUE);
627    table.put(put);
628
629    gets = new ArrayList<>();
630    gets.add(new Get(new byte[] { 0x00 }));
631    gets.add(new Get(new byte[] { 0x00, 0x00 }));
632    results = table.existsAll(gets);
633    assertTrue(results[0]);
634    assertFalse(results[1]);
635
636    // Test with the last region
637    put = new Put(new byte[] { (byte) 0xff, (byte) 0xff });
638    put.addColumn(FAMILY, QUALIFIER, VALUE);
639    table.put(put);
640
641    gets = new ArrayList<>();
642    gets.add(new Get(new byte[] { (byte) 0xff }));
643    gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff }));
644    gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff }));
645    results = table.existsAll(gets);
646    assertFalse(results[0]);
647    assertTrue(results[1]);
648    assertFalse(results[2]);
649  }
650
651  @Test
652  public void testGetEmptyRow() throws Exception {
653    //Create a table and put in 1 row
654    Admin admin = TEST_UTIL.getAdmin();
655    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(name.getMethodName())));
656    desc.addFamily(new HColumnDescriptor(FAMILY));
657    admin.createTable(desc);
658    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
659
660    Put put = new Put(ROW_BYTES);
661    put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
662    table.put(put);
663
664    //Try getting the row with an empty row key
665    Result res = null;
666    try {
667      res = table.get(new Get(new byte[0]));
668      fail();
669    } catch (IllegalArgumentException e) {
670      // Expected.
671    }
672    assertTrue(res == null);
673    res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
674    assertTrue(res.isEmpty() == true);
675    res = table.get(new Get(ROW_BYTES));
676    assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
677    table.close();
678  }
679
680  @Test
681  public void testConnectionDefaultUsesCodec() throws Exception {
682    ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
683    assertTrue(con.hasCellBlockSupport());
684  }
685
686  @Test
687  public void testPutWithPreBatchMutate() throws Exception {
688    final TableName tableName = TableName.valueOf(name.getMethodName());
689    testPreBatchMutate(tableName, () -> {
690      try {
691        Table t = TEST_UTIL.getConnection().getTable(tableName);
692        Put put = new Put(ROW);
693        put.addColumn(FAMILY, QUALIFIER, VALUE);
694        t.put(put);
695      } catch (IOException ex) {
696        throw new RuntimeException(ex);
697      }
698    });
699  }
700
701  @Test
702  public void testRowMutationsWithPreBatchMutate() throws Exception {
703    final TableName tableName = TableName.valueOf(name.getMethodName());
704    testPreBatchMutate(tableName, () -> {
705      try {
706        RowMutations rm = new RowMutations(ROW, 1);
707        Table t = TEST_UTIL.getConnection().getTable(tableName);
708        Put put = new Put(ROW);
709        put.addColumn(FAMILY, QUALIFIER, VALUE);
710        rm.add(put);
711        t.mutateRow(rm);
712      } catch (IOException ex) {
713        throw new RuntimeException(ex);
714      }
715    });
716  }
717
718  private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
719    HTableDescriptor desc = new HTableDescriptor(tableName);
720    desc.addCoprocessor(WaitingForScanObserver.class.getName());
721    desc.addFamily(new HColumnDescriptor(FAMILY));
722    TEST_UTIL.getAdmin().createTable(desc);
723    ExecutorService service = Executors.newFixedThreadPool(2);
724    service.execute(rn);
725    final List<Cell> cells = new ArrayList<>();
726    service.execute(() -> {
727      try {
728        // waiting for update.
729        TimeUnit.SECONDS.sleep(3);
730        Table t = TEST_UTIL.getConnection().getTable(tableName);
731        Scan scan = new Scan();
732        try (ResultScanner scanner = t.getScanner(scan)) {
733          for (Result r : scanner) {
734            cells.addAll(Arrays.asList(r.rawCells()));
735          }
736        }
737      } catch (IOException | InterruptedException ex) {
738        throw new RuntimeException(ex);
739      }
740    });
741    service.shutdown();
742    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
743    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
744      + ", so the data is invisible to reader", 0, cells.size());
745    TEST_UTIL.deleteTable(tableName);
746  }
747
748  @Test
749  public void testLockLeakWithDelta() throws Exception, Throwable {
750    final TableName tableName = TableName.valueOf(name.getMethodName());
751    HTableDescriptor desc = new HTableDescriptor(tableName);
752    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
753    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
754    desc.addFamily(new HColumnDescriptor(FAMILY));
755    TEST_UTIL.getAdmin().createTable(desc);
756    // new a connection for lower retry number.
757    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
758    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
759    try (Connection con = ConnectionFactory.createConnection(copy)) {
760      HRegion region = (HRegion) find(tableName);
761      region.setTimeoutForWriteLock(10);
762      ExecutorService putService = Executors.newSingleThreadExecutor();
763      putService.execute(() -> {
764        try (Table table = con.getTable(tableName)) {
765          Put put = new Put(ROW);
766          put.addColumn(FAMILY, QUALIFIER, VALUE);
767          // the put will be blocked by WaitingForMultiMutationsObserver.
768          table.put(put);
769        } catch (IOException ex) {
770          throw new RuntimeException(ex);
771        }
772      });
773      ExecutorService appendService = Executors.newSingleThreadExecutor();
774      appendService.execute(() -> {
775        Append append = new Append(ROW);
776        append.addColumn(FAMILY, QUALIFIER, VALUE);
777        try (Table table = con.getTable(tableName)) {
778          table.append(append);
779          fail("The APPEND should fail because the target lock is blocked by previous put");
780        } catch (Exception ex) {
781        }
782      });
783      appendService.shutdown();
784      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
785      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
786      observer.latch.countDown();
787      putService.shutdown();
788      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
789      try (Table table = con.getTable(tableName)) {
790        Result r = table.get(new Get(ROW));
791        assertFalse(r.isEmpty());
792        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
793      }
794    }
795    HRegion region = (HRegion) find(tableName);
796    int readLockCount = region.getReadLockCount();
797    LOG.info("readLockCount:" + readLockCount);
798    assertEquals(0, readLockCount);
799  }
800
801  @Test
802  public void testMultiRowMutations() throws Exception, Throwable {
803    final TableName tableName = TableName.valueOf(name.getMethodName());
804    HTableDescriptor desc = new HTableDescriptor(tableName);
805    desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
806    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
807    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
808    desc.addFamily(new HColumnDescriptor(FAMILY));
809    TEST_UTIL.getAdmin().createTable(desc);
810    // new a connection for lower retry number.
811    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
812    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
813    try (Connection con = ConnectionFactory.createConnection(copy)) {
814      byte[] row = Bytes.toBytes("ROW-0");
815      byte[] rowLocked= Bytes.toBytes("ROW-1");
816      byte[] value0 = Bytes.toBytes("VALUE-0");
817      byte[] value1 = Bytes.toBytes("VALUE-1");
818      byte[] value2 = Bytes.toBytes("VALUE-2");
819      assertNoLocks(tableName);
820      ExecutorService putService = Executors.newSingleThreadExecutor();
821      putService.execute(() -> {
822        try (Table table = con.getTable(tableName)) {
823          Put put0 = new Put(rowLocked);
824          put0.addColumn(FAMILY, QUALIFIER, value0);
825          // the put will be blocked by WaitingForMultiMutationsObserver.
826          table.put(put0);
827        } catch (IOException ex) {
828          throw new RuntimeException(ex);
829        }
830      });
831      ExecutorService cpService = Executors.newSingleThreadExecutor();
832      cpService.execute(() -> {
833        boolean threw;
834        Put put1 = new Put(row);
835        Put put2 = new Put(rowLocked);
836        put1.addColumn(FAMILY, QUALIFIER, value1);
837        put2.addColumn(FAMILY, QUALIFIER, value2);
838        try (Table table = con.getTable(tableName)) {
839          MultiRowMutationProtos.MutateRowsRequest request
840            = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
841              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
842                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put1))
843              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
844                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, put2))
845              .build();
846          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
847            ROW, ROW,
848            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
849              ServerRpcController controller = new ServerRpcController();
850              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
851                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
852              exe.mutateRows(controller, request, rpcCallback);
853              return rpcCallback.get();
854            });
855          threw = false;
856        } catch (Throwable ex) {
857          threw = true;
858        }
859        if (!threw) {
860          // Can't call fail() earlier because the catch would eat it.
861          fail("This cp should fail because the target lock is blocked by previous put");
862        }
863      });
864      cpService.shutdown();
865      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
866      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
867      observer.latch.countDown();
868      putService.shutdown();
869      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
870      try (Table table = con.getTable(tableName)) {
871        Get g0 = new Get(row);
872        Get g1 = new Get(rowLocked);
873        Result r0 = table.get(g0);
874        Result r1 = table.get(g1);
875        assertTrue(r0.isEmpty());
876        assertFalse(r1.isEmpty());
877        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
878      }
879      assertNoLocks(tableName);
880    }
881  }
882
883  /**
884   * A test case for issue HBASE-17482
885   * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
886   * onto cells in the append thread, a countdown latch is used to ensure that happened
887   * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
888   * make the seqid/mvcc acquirement in handler thread and stamping in append thread
889   * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
890   * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
891   * with a smaller readpoint can see these data, which disobey the multi version
892   * concurrency control rules.
893   * This test case is to reproduce this scenario.
894   * @throws IOException
895   */
896  @Test
897  public void testMVCCUsingMVCCPreAssign() throws IOException {
898    final TableName tableName = TableName.valueOf(name.getMethodName());
899    HTableDescriptor htd = new HTableDescriptor(tableName);
900    HColumnDescriptor fam = new HColumnDescriptor(FAMILY);
901    htd.addFamily(fam);
902    Admin admin = TEST_UTIL.getAdmin();
903    admin.createTable(htd);
904    Table table = admin.getConnection().getTable(TableName.valueOf(name.getMethodName()));
905    //put two row first to init the scanner
906    Put put = new Put(Bytes.toBytes("0"));
907    put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes("0"));
908    table.put(put);
909    put = new Put(Bytes.toBytes("00"));
910    put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes("0"));
911    table.put(put);
912    Scan scan = new Scan();
913    scan.setTimeRange(0, Long.MAX_VALUE);
914    scan.setCaching(1);
915    ResultScanner scanner = table.getScanner(scan);
916    int rowNum = scanner.next() != null ? 1 : 0;
917    //the started scanner shouldn't see the rows put below
918    for(int i = 1; i < 1000; i++) {
919      put = new Put(Bytes.toBytes(String.valueOf(i)));
920      put.setDurability(Durability.ASYNC_WAL);
921      put.addColumn(FAMILY, Bytes.toBytes( ""), Bytes.toBytes(i));
922      table.put(put);
923    }
924    for(Result result : scanner) {
925      rowNum++;
926    }
927    //scanner should only see two rows
928    assertEquals(2, rowNum);
929    scanner = table.getScanner(scan);
930    rowNum = 0;
931    for(Result result : scanner) {
932      rowNum++;
933    }
934    // the new scanner should see all rows
935    assertEquals(1001, rowNum);
936
937
938  }
939
940  @Test
941  public void testPutThenGetWithMultipleThreads() throws Exception {
942    final TableName tableName = TableName.valueOf(name.getMethodName());
943    final int THREAD_NUM = 20;
944    final int ROUND_NUM = 10;
945    for (int round = 0; round < ROUND_NUM; round++) {
946      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
947      final AtomicInteger successCnt = new AtomicInteger(0);
948      Table ht = TEST_UTIL.createTable(tableName, FAMILY);
949      for (int i = 0; i < THREAD_NUM; i++) {
950        final int index = i;
951        Thread t = new Thread(new Runnable() {
952
953          @Override
954          public void run() {
955            final byte[] row = Bytes.toBytes("row-" + index);
956            final byte[] value = Bytes.toBytes("v" + index);
957            try {
958              Put put = new Put(row);
959              put.addColumn(FAMILY, QUALIFIER, value);
960              ht.put(put);
961              Get get = new Get(row);
962              Result result = ht.get(get);
963              byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
964              if (Bytes.equals(value, returnedValue)) {
965                successCnt.getAndIncrement();
966              } else {
967                LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
968                    + ", returned value: "
969                    + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
970              }
971            } catch (Throwable e) {
972              // do nothing
973            }
974          }
975        });
976        threads.add(t);
977      }
978      for (Thread t : threads) {
979        t.start();
980      }
981      for (Thread t : threads) {
982        t.join();
983      }
984      assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
985      ht.close();
986      TEST_UTIL.deleteTable(tableName);
987    }
988  }
989
990  private static void assertNoLocks(final TableName tableName) throws IOException, InterruptedException {
991    HRegion region = (HRegion) find(tableName);
992    assertEquals(0, region.getLockedRows().size());
993  }
994  private static HRegion find(final TableName tableName)
995      throws IOException, InterruptedException {
996    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
997    List<HRegion> regions = rs.getRegions(tableName);
998    assertEquals(1, regions.size());
999    return regions.get(0);
1000  }
1001
1002  private static <T extends RegionObserver> T find(final TableName tableName,
1003          Class<T> clz) throws IOException, InterruptedException {
1004    HRegion region = find(tableName);
1005    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1006    assertTrue("The cp instance should be " + clz.getName()
1007            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1008    return clz.cast(cp);
1009  }
1010
1011  public static class WaitingForMultiMutationsObserver
1012      implements RegionCoprocessor, RegionObserver {
1013    final CountDownLatch latch = new CountDownLatch(1);
1014
1015    @Override
1016    public Optional<RegionObserver> getRegionObserver() {
1017      return Optional.of(this);
1018    }
1019
1020    @Override
1021    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1022            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1023      try {
1024        latch.await();
1025      } catch (InterruptedException ex) {
1026        throw new IOException(ex);
1027      }
1028    }
1029  }
1030
1031  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1032    private final CountDownLatch latch = new CountDownLatch(1);
1033
1034    @Override
1035    public Optional<RegionObserver> getRegionObserver() {
1036      return Optional.of(this);
1037    }
1038
1039    @Override
1040    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1041            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1042      try {
1043        // waiting for scanner
1044        latch.await();
1045      } catch (InterruptedException ex) {
1046        throw new IOException(ex);
1047      }
1048    }
1049
1050    @Override
1051    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1052            final Scan scan, final RegionScanner s) throws IOException {
1053      latch.countDown();
1054      return s;
1055    }
1056  }
1057
1058  static byte[] generateHugeValue(int size) {
1059    Random rand = ThreadLocalRandom.current();
1060    byte[] value = new byte[size];
1061    for (int i = 0; i < value.length; i++) {
1062      value[i] = (byte) rand.nextInt(256);
1063    }
1064    return value;
1065  }
1066
1067  @Test
1068  public void testScanWithBatchSizeReturnIncompleteCells() throws IOException {
1069    TableName tableName = TableName.valueOf(name.getMethodName());
1070    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1071        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1072        .build();
1073
1074    Table table = TEST_UTIL.createTable(hd, null);
1075
1076    Put put = new Put(ROW);
1077    put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1078    table.put(put);
1079
1080    put = new Put(ROW);
1081    put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1082    table.put(put);
1083
1084    for (int i = 2; i < 5; i++) {
1085      for (int version = 0; version < 2; version++) {
1086        put = new Put(ROW);
1087        put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1088        table.put(put);
1089      }
1090    }
1091
1092    Scan scan = new Scan();
1093    scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1094        .setMaxResultSize(4 * 1024 * 1024);
1095    Result result;
1096    try (ResultScanner scanner = table.getScanner(scan)) {
1097      List<Result> list = new ArrayList<>();
1098      /*
1099       * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
1100       * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
1101       * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1102       * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1103       * into two result: 2+3 -> 3+2;
1104       */
1105      while ((result = scanner.next()) != null) {
1106        list.add(result);
1107      }
1108
1109      Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1110      Assert.assertEquals(2, list.size());
1111      Assert.assertEquals(3, list.get(0).size());
1112      Assert.assertEquals(2, list.get(1).size());
1113    }
1114
1115    scan = new Scan();
1116    scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1117        .setMaxResultSize(4 * 1024 * 1024);
1118    try (ResultScanner scanner = table.getScanner(scan)) {
1119      List<Result> list = new ArrayList<>();
1120      while ((result = scanner.next()) != null) {
1121        list.add(result);
1122      }
1123      Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1124      Assert.assertEquals(3, list.size());
1125      Assert.assertEquals(2, list.get(0).size());
1126      Assert.assertEquals(2, list.get(1).size());
1127      Assert.assertEquals(1, list.get(2).size());
1128    }
1129  }
1130}