001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.List;
031import java.util.Optional;
032import java.util.Random;
033import java.util.concurrent.CountDownLatch;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadLocalRandom;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.atomic.AtomicInteger;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.Coprocessor;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.HColumnDescriptor;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.HTableDescriptor;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
051import org.apache.hadoop.hbase.coprocessor.ObserverContext;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
054import org.apache.hadoop.hbase.coprocessor.RegionObserver;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
056import org.apache.hadoop.hbase.ipc.ServerRpcController;
057import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
058import org.apache.hadoop.hbase.regionserver.HRegion;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
061import org.apache.hadoop.hbase.regionserver.RegionScanner;
062import org.apache.hadoop.hbase.testclassification.ClientTests;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.Pair;
066import org.junit.After;
067import org.junit.AfterClass;
068import org.junit.Assert;
069import org.junit.Before;
070import org.junit.BeforeClass;
071import org.junit.ClassRule;
072import org.junit.Rule;
073import org.junit.Test;
074import org.junit.experimental.categories.Category;
075import org.junit.rules.TestName;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
081
082@Category({LargeTests.class, ClientTests.class})
083public class TestFromClientSide3 {
084
085  @ClassRule
086  public static final HBaseClassTestRule CLASS_RULE =
087      HBaseClassTestRule.forClass(TestFromClientSide3.class);
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
090  private final static HBaseTestingUtility TEST_UTIL
091    = new HBaseTestingUtility();
092  private static final int WAITTABLE_MILLIS = 10000;
093  private static byte[] FAMILY = Bytes.toBytes("testFamily");
094  private static Random random = new Random();
095  private static int SLAVES = 3;
096  private static final byte[] ROW = Bytes.toBytes("testRow");
097  private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
098  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
099  private static final byte[] VALUE = Bytes.toBytes("testValue");
100  private static final byte[] COL_QUAL = Bytes.toBytes("f1");
101  private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
102  private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
103
104  @Rule
105  public TestName name = new TestName();
106  private TableName tableName;
107
108  /**
109   * @throws java.lang.Exception
110   */
111  @BeforeClass
112  public static void setUpBeforeClass() throws Exception {
113    TEST_UTIL.startMiniCluster(SLAVES);
114  }
115
116  /**
117   * @throws java.lang.Exception
118   */
119  @AfterClass
120  public static void tearDownAfterClass() throws Exception {
121    TEST_UTIL.shutdownMiniCluster();
122  }
123
124  /**
125   * @throws java.lang.Exception
126   */
127  @Before
128  public void setUp() throws Exception {
129    tableName = TableName.valueOf(name.getMethodName());
130  }
131
132  /**
133   * @throws java.lang.Exception
134   */
135  @After
136  public void tearDown() throws Exception {
137    for (HTableDescriptor htd: TEST_UTIL.getAdmin().listTables()) {
138      LOG.info("Tear down, remove table=" + htd.getTableName());
139      TEST_UTIL.deleteTable(htd.getTableName());
140  }
141  }
142
143  private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
144      throws Exception {
145    Put put = new Put(row);
146    for (int i = 0; i < nPuts; i++) {
147      byte[] qualifier = Bytes.toBytes(random.nextInt());
148      byte[] value = Bytes.toBytes(random.nextInt());
149      put.addColumn(family, qualifier, value);
150    }
151    table.put(put);
152  }
153
154  private void performMultiplePutAndFlush(HBaseAdmin admin, Table table,
155      byte[] row, byte[] family, int nFlushes, int nPuts)
156  throws Exception {
157
158    try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(table.getName())) {
159      // connection needed for poll-wait
160      HRegionLocation loc = locator.getRegionLocation(row, true);
161      AdminProtos.AdminService.BlockingInterface server =
162        ((ClusterConnection) admin.getConnection()).getAdmin(loc.getServerName());
163      byte[] regName = loc.getRegionInfo().getRegionName();
164
165      for (int i = 0; i < nFlushes; i++) {
166        randomCFPuts(table, row, family, nPuts);
167        List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY);
168        int sfCount = sf.size();
169
170        admin.flush(table.getName());
171      }
172    }
173  }
174
175  private static List<Cell> toList(ResultScanner scanner) {
176    try {
177      List<Cell> cells = new ArrayList<>();
178      for (Result r : scanner) {
179        cells.addAll(r.listCells());
180      }
181      return cells;
182    } finally {
183      scanner.close();
184    }
185  }
186
187  @Test
188  public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
189    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
190      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
191      byte[] row = Bytes.toBytes("SpecifiedRow");
192      byte[] value0 = Bytes.toBytes("value_0");
193      byte[] value1 = Bytes.toBytes("value_1");
194      Put put = new Put(row);
195      put.addColumn(FAMILY, QUALIFIER, VALUE);
196      table.put(put);
197      Delete d = new Delete(row);
198      table.delete(d);
199      put = new Put(row);
200      put.addColumn(FAMILY, null, value0);
201      table.put(put);
202      put = new Put(row);
203      put.addColumn(FAMILY, null, value1);
204      table.put(put);
205      List<Cell> cells = toList(table.getScanner(new Scan()));
206      assertEquals(1, cells.size());
207      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
208
209      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
210      assertEquals(1, cells.size());
211      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
212
213      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
214      assertEquals(0, cells.size());
215
216      TEST_UTIL.getAdmin().flush(tableName);
217      cells = toList(table.getScanner(new Scan()));
218      assertEquals(1, cells.size());
219      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
220
221      cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
222      assertEquals(1, cells.size());
223      assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
224
225      cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
226      assertEquals(0, cells.size());
227    }
228  }
229
230  @Test
231  public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
232    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
233      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
234      byte[] row = Bytes.toBytes("SpecifiedRow");
235      byte[] qual0 = Bytes.toBytes("qual0");
236      byte[] qual1 = Bytes.toBytes("qual1");
237      long now = System.currentTimeMillis();
238      Delete d = new Delete(row, now);
239      table.delete(d);
240
241      Put put = new Put(row);
242      put.addColumn(FAMILY, null, now + 1, VALUE);
243      table.put(put);
244
245      put = new Put(row);
246      put.addColumn(FAMILY, qual1, now + 2, qual1);
247      table.put(put);
248
249      put = new Put(row);
250      put.addColumn(FAMILY, qual0, now + 3, qual0);
251      table.put(put);
252
253      Result r = table.get(new Get(row));
254      assertEquals(r.toString(), 3, r.size());
255      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
256      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
257      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
258
259      TEST_UTIL.getAdmin().flush(tableName);
260      r = table.get(new Get(row));
261      assertEquals(3, r.size());
262      assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
263      assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
264      assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
265    }
266  }
267
268  // override the config settings at the CF level and ensure priority
269  @Test
270  public void testAdvancedConfigOverride() throws Exception {
271    /*
272     * Overall idea: (1) create 3 store files and issue a compaction. config's
273     * compaction.min == 3, so should work. (2) Increase the compaction.min
274     * toggle in the HTD to 5 and modify table. If we use the HTD value instead
275     * of the default config value, adding 3 files and issuing a compaction
276     * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
277     * and modify table. The CF schema should override the Table schema and now
278     * cause a minor compaction.
279     */
280    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
281
282    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
283      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
284      try (Admin admin = TEST_UTIL.getAdmin()) {
285        ClusterConnection connection = (ClusterConnection) TEST_UTIL.getConnection();
286
287        // Create 3 store files.
288        byte[] row = Bytes.toBytes(random.nextInt());
289        performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 100);
290
291        try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
292          // Verify we have multiple store files.
293          HRegionLocation loc = locator.getRegionLocation(row, true);
294          byte[] regionName = loc.getRegionInfo().getRegionName();
295          AdminProtos.AdminService.BlockingInterface server =
296                  connection.getAdmin(loc.getServerName());
297          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() > 1);
298
299          // Issue a compaction request
300          admin.compact(tableName);
301
302          // poll wait for the compactions to happen
303          for (int i = 0; i < 10 * 1000 / 40; ++i) {
304            // The number of store files after compaction should be lesser.
305            loc = locator.getRegionLocation(row, true);
306            if (!loc.getRegionInfo().isOffline()) {
307              regionName = loc.getRegionInfo().getRegionName();
308              server = connection.getAdmin(loc.getServerName());
309              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1) {
310                break;
311              }
312            }
313            Thread.sleep(40);
314          }
315          // verify the compactions took place and that we didn't just time out
316          assertTrue(ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() <= 1);
317
318          // change the compaction.min config option for this table to 5
319          LOG.info("hbase.hstore.compaction.min should now be 5");
320          HTableDescriptor htd = new HTableDescriptor(table.getTableDescriptor());
321          htd.setValue("hbase.hstore.compaction.min", String.valueOf(5));
322          admin.modifyTable(tableName, htd);
323          Pair<Integer, Integer> st = admin.getAlterStatus(tableName);
324          while (null != st && st.getFirst() > 0) {
325            LOG.debug(st.getFirst() + " regions left to update");
326            Thread.sleep(40);
327            st = admin.getAlterStatus(tableName);
328          }
329          LOG.info("alter status finished");
330
331          // Create 3 more store files.
332          performMultiplePutAndFlush((HBaseAdmin) admin, table, row, FAMILY, 3, 10);
333
334          // Issue a compaction request
335          admin.compact(tableName);
336
337          // This time, the compaction request should not happen
338          Thread.sleep(10 * 1000);
339          loc = locator.getRegionLocation(row, true);
340          regionName = loc.getRegionInfo().getRegionName();
341          server = connection.getAdmin(loc.getServerName());
342          int sfCount = ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size();
343          assertTrue(sfCount > 1);
344
345          // change an individual CF's config option to 2 & online schema update
346          LOG.info("hbase.hstore.compaction.min should now be 2");
347          HColumnDescriptor hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
348          hcd.setValue("hbase.hstore.compaction.min", String.valueOf(2));
349          htd.modifyFamily(hcd);
350          admin.modifyTable(tableName, htd);
351          st = admin.getAlterStatus(tableName);
352          while (null != st && st.getFirst() > 0) {
353            LOG.debug(st.getFirst() + " regions left to update");
354            Thread.sleep(40);
355            st = admin.getAlterStatus(tableName);
356          }
357          LOG.info("alter status finished");
358
359          // Issue a compaction request
360          admin.compact(tableName);
361
362          // poll wait for the compactions to happen
363          for (int i = 0; i < 10 * 1000 / 40; ++i) {
364            loc = locator.getRegionLocation(row, true);
365            regionName = loc.getRegionInfo().getRegionName();
366            try {
367              server = connection.getAdmin(loc.getServerName());
368              if (ProtobufUtil.getStoreFiles(server, regionName, FAMILY).size() < sfCount) {
369                break;
370              }
371            } catch (Exception e) {
372              LOG.debug("Waiting for region to come online: " + Bytes.toString(regionName));
373            }
374            Thread.sleep(40);
375          }
376
377          // verify the compaction took place and that we didn't just time out
378          assertTrue(ProtobufUtil.getStoreFiles(
379                  server, regionName, FAMILY).size() < sfCount);
380
381          // Finally, ensure that we can remove a custom config value after we made it
382          LOG.info("Removing CF config value");
383          LOG.info("hbase.hstore.compaction.min should now be 5");
384          hcd = new HColumnDescriptor(htd.getFamily(FAMILY));
385          hcd.setValue("hbase.hstore.compaction.min", null);
386          htd.modifyFamily(hcd);
387          admin.modifyTable(tableName, htd);
388          st = admin.getAlterStatus(tableName);
389          while (null != st && st.getFirst() > 0) {
390            LOG.debug(st.getFirst() + " regions left to update");
391            Thread.sleep(40);
392            st = admin.getAlterStatus(tableName);
393          }
394          LOG.info("alter status finished");
395          assertNull(table.getTableDescriptor().getFamily(FAMILY).getValue(
396                  "hbase.hstore.compaction.min"));
397        }
398      }
399    }
400  }
401
402  @Test
403  public void testHTableBatchWithEmptyPut() throws IOException, InterruptedException {
404    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
405      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
406      List actions = (List) new ArrayList();
407      Object[] results = new Object[2];
408      // create an empty Put
409      Put put1 = new Put(ROW);
410      actions.add(put1);
411
412      Put put2 = new Put(ANOTHERROW);
413      put2.addColumn(FAMILY, QUALIFIER, VALUE);
414      actions.add(put2);
415
416      table.batch(actions, results);
417      fail("Empty Put should have failed the batch call");
418    } catch (IllegalArgumentException iae) {
419    }
420  }
421
422  // Test Table.batch with large amount of mutations against the same key.
423  // It used to trigger read lock's "Maximum lock count exceeded" Error.
424  @Test
425  public void testHTableWithLargeBatch() throws IOException, InterruptedException {
426    int sixtyFourK = 64 * 1024;
427    List actions = new ArrayList();
428    Object[] results = new Object[(sixtyFourK + 1) * 2];
429
430    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
431      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
432
433      for (int i = 0; i < sixtyFourK + 1; i++) {
434        Put put1 = new Put(ROW);
435        put1.addColumn(FAMILY, QUALIFIER, VALUE);
436        actions.add(put1);
437
438        Put put2 = new Put(ANOTHERROW);
439        put2.addColumn(FAMILY, QUALIFIER, VALUE);
440        actions.add(put2);
441      }
442
443      table.batch(actions, results);
444    }
445  }
446
447  @Test
448  public void testBatchWithRowMutation() throws Exception {
449    LOG.info("Starting testBatchWithRowMutation");
450    byte [][] QUALIFIERS = new byte [][] {
451      Bytes.toBytes("a"), Bytes.toBytes("b")
452    };
453
454    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
455      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
456
457      RowMutations arm = RowMutations.of(Collections.singletonList(
458              new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
459      Object[] batchResult = new Object[1];
460      table.batch(Arrays.asList(arm), batchResult);
461
462      Get g = new Get(ROW);
463      Result r = table.get(g);
464      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
465
466      arm = RowMutations.of(Arrays.asList(
467              new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
468              new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
469      table.batch(Arrays.asList(arm), batchResult);
470      r = table.get(g);
471      assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
472      assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
473
474      // Test that we get the correct remote exception for RowMutations from batch()
475      try {
476        arm = RowMutations.of(Collections.singletonList(
477                new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
478        table.batch(Arrays.asList(arm), batchResult);
479        fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
480      } catch (RetriesExhaustedWithDetailsException e) {
481        String msg = e.getMessage();
482        assertTrue(msg.contains("NoSuchColumnFamilyException"));
483      }
484    }
485  }
486
487  @Test
488  public void testHTableExistsMethodSingleRegionSingleGet()
489          throws IOException, InterruptedException {
490    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
491      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
492
493      // Test with a single region table.
494      Put put = new Put(ROW);
495      put.addColumn(FAMILY, QUALIFIER, VALUE);
496
497      Get get = new Get(ROW);
498
499      boolean exist = table.exists(get);
500      assertFalse(exist);
501
502      table.put(put);
503
504      exist = table.exists(get);
505      assertTrue(exist);
506    }
507  }
508
509  @Test
510  public void testHTableExistsMethodSingleRegionMultipleGets()
511          throws IOException, InterruptedException {
512    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
513      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
514
515      Put put = new Put(ROW);
516      put.addColumn(FAMILY, QUALIFIER, VALUE);
517      table.put(put);
518
519      List<Get> gets = new ArrayList<>();
520      gets.add(new Get(ROW));
521      gets.add(new Get(ANOTHERROW));
522
523      boolean[] results = table.exists(gets);
524      assertTrue(results[0]);
525      assertFalse(results[1]);
526    }
527  }
528
529  @Test
530  public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
531    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
532      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
533
534      Put put = new Put(ROW);
535      put.addColumn(FAMILY, QUALIFIER, VALUE);
536      table.put(put);
537
538      Get get = new Get(ROW);
539
540      boolean exist = table.exists(get);
541      assertEquals(true, exist);
542
543      Result result = table.get(get);
544      assertEquals(false, result.isEmpty());
545      assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
546    }
547  }
548
549  @Test
550  public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
551    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
552      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
553
554      final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
555      Put put = new Put(ROW);
556      put.addColumn(FAMILY, QUALIFIER, VALUE);
557      table.put(put);
558      put = new Put(ROW2);
559      put.addColumn(FAMILY, QUALIFIER, VALUE);
560      table.put(put);
561
562      Get get = new Get(ROW);
563      Get get2 = new Get(ROW2);
564      ArrayList<Get> getList = new ArrayList(2);
565      getList.add(get);
566      getList.add(get2);
567
568      boolean[] exists = table.existsAll(getList);
569      assertEquals(true, exists[0]);
570      assertEquals(true, exists[1]);
571
572      Result[] result = table.get(getList);
573      assertEquals(false, result[0].isEmpty());
574      assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
575      assertEquals(false, result[1].isEmpty());
576      assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
577    }
578  }
579
580  @Test
581  public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
582    try (Table table = TEST_UTIL.createTable(
583      tableName, new byte[][] { FAMILY },
584      1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
585      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
586
587      Put put = new Put(ROW);
588      put.addColumn(FAMILY, QUALIFIER, VALUE);
589
590      Get get = new Get(ROW);
591
592      boolean exist = table.exists(get);
593      assertFalse(exist);
594
595      table.put(put);
596
597      exist = table.exists(get);
598      assertTrue(exist);
599    }
600  }
601
602  @Test
603  public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
604    try (Table table = TEST_UTIL.createTable(
605      tableName,
606      new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
607      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
608
609      Put put = new Put(ROW);
610      put.addColumn(FAMILY, QUALIFIER, VALUE);
611      table.put(put);
612
613      List<Get> gets = new ArrayList<>();
614      gets.add(new Get(ANOTHERROW));
615      gets.add(new Get(Bytes.add(ROW, new byte[]{0x00})));
616      gets.add(new Get(ROW));
617      gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00})));
618
619      LOG.info("Calling exists");
620      boolean[] results = table.existsAll(gets);
621      assertFalse(results[0]);
622      assertFalse(results[1]);
623      assertTrue(results[2]);
624      assertFalse(results[3]);
625
626      // Test with the first region.
627      put = new Put(new byte[]{0x00});
628      put.addColumn(FAMILY, QUALIFIER, VALUE);
629      table.put(put);
630
631      gets = new ArrayList<>();
632      gets.add(new Get(new byte[]{0x00}));
633      gets.add(new Get(new byte[]{0x00, 0x00}));
634      results = table.existsAll(gets);
635      assertTrue(results[0]);
636      assertFalse(results[1]);
637
638      // Test with the last region
639      put = new Put(new byte[]{(byte) 0xff, (byte) 0xff});
640      put.addColumn(FAMILY, QUALIFIER, VALUE);
641      table.put(put);
642
643      gets = new ArrayList<>();
644      gets.add(new Get(new byte[]{(byte) 0xff}));
645      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
646      gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
647      results = table.existsAll(gets);
648      assertFalse(results[0]);
649      assertTrue(results[1]);
650      assertFalse(results[2]);
651    }
652  }
653
654  @Test
655  public void testGetEmptyRow() throws Exception {
656    //Create a table and put in 1 row
657    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
658      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
659
660      Put put = new Put(ROW_BYTES);
661      put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
662      table.put(put);
663
664      //Try getting the row with an empty row key
665      Result res = null;
666      try {
667        res = table.get(new Get(new byte[0]));
668        fail();
669      } catch (IllegalArgumentException e) {
670        // Expected.
671      }
672      assertTrue(res == null);
673      res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
674      assertTrue(res.isEmpty() == true);
675      res = table.get(new Get(ROW_BYTES));
676      assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
677    }
678  }
679
680  @Test
681  public void testConnectionDefaultUsesCodec() throws Exception {
682    ClusterConnection con = (ClusterConnection) TEST_UTIL.getConnection();
683    assertTrue(con.hasCellBlockSupport());
684  }
685
686  @Test
687  public void testPutWithPreBatchMutate() throws Exception {
688    testPreBatchMutate(tableName, () -> {
689      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
690        Put put = new Put(ROW);
691        put.addColumn(FAMILY, QUALIFIER, VALUE);
692        t.put(put);
693      } catch (IOException ex) {
694        throw new RuntimeException(ex);
695      }
696    });
697  }
698
699  @Test
700  public void testRowMutationsWithPreBatchMutate() throws Exception {
701    testPreBatchMutate(tableName, () -> {
702      try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
703        RowMutations rm = new RowMutations(ROW, 1);
704        Put put = new Put(ROW);
705        put.addColumn(FAMILY, QUALIFIER, VALUE);
706        rm.add(put);
707        t.mutateRow(rm);
708      } catch (IOException ex) {
709        throw new RuntimeException(ex);
710      }
711    });
712  }
713
714  private void testPreBatchMutate(TableName tableName, Runnable rn)throws Exception {
715    HTableDescriptor desc = new HTableDescriptor(tableName);
716    desc.addCoprocessor(WaitingForScanObserver.class.getName());
717    desc.addFamily(new HColumnDescriptor(FAMILY));
718    TEST_UTIL.getAdmin().createTable(desc);
719    // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
720
721    ExecutorService service = Executors.newFixedThreadPool(2);
722    service.execute(rn);
723    final List<Cell> cells = new ArrayList<>();
724    service.execute(() -> {
725      try {
726        // waiting for update.
727        TimeUnit.SECONDS.sleep(3);
728        try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
729          Scan scan = new Scan();
730          try (ResultScanner scanner = t.getScanner(scan)) {
731            for (Result r : scanner) {
732              cells.addAll(Arrays.asList(r.rawCells()));
733            }
734          }
735        }
736      } catch (IOException | InterruptedException ex) {
737        throw new RuntimeException(ex);
738      }
739    });
740    service.shutdown();
741    service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
742    assertEquals("The write is blocking by RegionObserver#postBatchMutate"
743            + ", so the data is invisible to reader", 0, cells.size());
744    TEST_UTIL.deleteTable(tableName);
745  }
746
747  @Test
748  public void testLockLeakWithDelta() throws Exception, Throwable {
749    HTableDescriptor desc = new HTableDescriptor(tableName);
750    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
751    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
752    desc.addFamily(new HColumnDescriptor(FAMILY));
753    TEST_UTIL.getAdmin().createTable(desc);
754    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
755
756    // new a connection for lower retry number.
757    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
758    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
759    try (Connection con = ConnectionFactory.createConnection(copy)) {
760      HRegion region = (HRegion) find(tableName);
761      region.setTimeoutForWriteLock(10);
762      ExecutorService putService = Executors.newSingleThreadExecutor();
763      putService.execute(() -> {
764        try (Table table = con.getTable(tableName)) {
765          Put put = new Put(ROW);
766          put.addColumn(FAMILY, QUALIFIER, VALUE);
767          // the put will be blocked by WaitingForMultiMutationsObserver.
768          table.put(put);
769        } catch (IOException ex) {
770          throw new RuntimeException(ex);
771        }
772      });
773      ExecutorService appendService = Executors.newSingleThreadExecutor();
774      appendService.execute(() -> {
775        Append append = new Append(ROW);
776        append.addColumn(FAMILY, QUALIFIER, VALUE);
777        try (Table table = con.getTable(tableName)) {
778          table.append(append);
779          fail("The APPEND should fail because the target lock is blocked by previous put");
780        } catch (Exception ex) {
781        }
782      });
783      appendService.shutdown();
784      appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
785      WaitingForMultiMutationsObserver observer =
786              find(tableName, WaitingForMultiMutationsObserver.class);
787      observer.latch.countDown();
788      putService.shutdown();
789      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
790      try (Table table = con.getTable(tableName)) {
791        Result r = table.get(new Get(ROW));
792        assertFalse(r.isEmpty());
793        assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
794      }
795    }
796    HRegion region = (HRegion) find(tableName);
797    int readLockCount = region.getReadLockCount();
798    LOG.info("readLockCount:" + readLockCount);
799    assertEquals(0, readLockCount);
800  }
801
802  @Test
803  public void testMultiRowMutations() throws Exception, Throwable {
804    HTableDescriptor desc = new HTableDescriptor(tableName);
805    desc.addCoprocessor(MultiRowMutationEndpoint.class.getName());
806    desc.addCoprocessor(WaitingForMultiMutationsObserver.class.getName());
807    desc.setConfiguration("hbase.rowlock.wait.duration", String.valueOf(5000));
808    desc.addFamily(new HColumnDescriptor(FAMILY));
809    TEST_UTIL.getAdmin().createTable(desc);
810    TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
811
812    // new a connection for lower retry number.
813    Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
814    copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
815    try (Connection con = ConnectionFactory.createConnection(copy)) {
816      byte[] row = Bytes.toBytes("ROW-0");
817      byte[] rowLocked= Bytes.toBytes("ROW-1");
818      byte[] value0 = Bytes.toBytes("VALUE-0");
819      byte[] value1 = Bytes.toBytes("VALUE-1");
820      byte[] value2 = Bytes.toBytes("VALUE-2");
821      assertNoLocks(tableName);
822      ExecutorService putService = Executors.newSingleThreadExecutor();
823      putService.execute(() -> {
824        try (Table table = con.getTable(tableName)) {
825          Put put0 = new Put(rowLocked);
826          put0.addColumn(FAMILY, QUALIFIER, value0);
827          // the put will be blocked by WaitingForMultiMutationsObserver.
828          table.put(put0);
829        } catch (IOException ex) {
830          throw new RuntimeException(ex);
831        }
832      });
833      ExecutorService cpService = Executors.newSingleThreadExecutor();
834      cpService.execute(() -> {
835        boolean threw;
836        Put put1 = new Put(row);
837        Put put2 = new Put(rowLocked);
838        put1.addColumn(FAMILY, QUALIFIER, value1);
839        put2.addColumn(FAMILY, QUALIFIER, value2);
840        try (Table table = con.getTable(tableName)) {
841          MultiRowMutationProtos.MutateRowsRequest request
842            = MultiRowMutationProtos.MutateRowsRequest.newBuilder()
843              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
844                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
845                              .MutationType.PUT, put1))
846              .addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
847                      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto
848                              .MutationType.PUT, put2))
849              .build();
850          table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
851            ROW, ROW,
852            (MultiRowMutationProtos.MultiRowMutationService exe) -> {
853              ServerRpcController controller = new ServerRpcController();
854              CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
855                rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
856              exe.mutateRows(controller, request, rpcCallback);
857              return rpcCallback.get();
858            });
859          threw = false;
860        } catch (Throwable ex) {
861          threw = true;
862        }
863        if (!threw) {
864          // Can't call fail() earlier because the catch would eat it.
865          fail("This cp should fail because the target lock is blocked by previous put");
866        }
867      });
868      cpService.shutdown();
869      cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
870      WaitingForMultiMutationsObserver observer = find(tableName, WaitingForMultiMutationsObserver.class);
871      observer.latch.countDown();
872      putService.shutdown();
873      putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
874      try (Table table = con.getTable(tableName)) {
875        Get g0 = new Get(row);
876        Get g1 = new Get(rowLocked);
877        Result r0 = table.get(g0);
878        Result r1 = table.get(g1);
879        assertTrue(r0.isEmpty());
880        assertFalse(r1.isEmpty());
881        assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
882      }
883      assertNoLocks(tableName);
884    }
885  }
886
887  /**
888   * A test case for issue HBASE-17482
889   * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
890   * onto cells in the append thread, a countdown latch is used to ensure that happened
891   * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
892   * make the seqid/mvcc acquirement in handler thread and stamping in append thread
893   * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
894   * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
895   * with a smaller readpoint can see these data, which disobey the multi version
896   * concurrency control rules.
897   * This test case is to reproduce this scenario.
898   * @throws IOException
899   */
900  @Test
901  public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
902    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
903      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
904      //put two row first to init the scanner
905      Put put = new Put(Bytes.toBytes("0"));
906      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
907      table.put(put);
908      put = new Put(Bytes.toBytes("00"));
909      put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
910      table.put(put);
911      Scan scan = new Scan();
912      scan.setTimeRange(0, Long.MAX_VALUE);
913      scan.setCaching(1);
914      ResultScanner scanner = table.getScanner(scan);
915      int rowNum = scanner.next() != null ? 1 : 0;
916      //the started scanner shouldn't see the rows put below
917      for (int i = 1; i < 1000; i++) {
918        put = new Put(Bytes.toBytes(String.valueOf(i)));
919        put.setDurability(Durability.ASYNC_WAL);
920        put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
921        table.put(put);
922      }
923      for (Result result : scanner) {
924        rowNum++;
925      }
926      //scanner should only see two rows
927      assertEquals(2, rowNum);
928      scanner = table.getScanner(scan);
929      rowNum = 0;
930      for (Result result : scanner) {
931        rowNum++;
932      }
933      // the new scanner should see all rows
934      assertEquals(1001, rowNum);
935    }
936  }
937
938  @Test
939  public void testPutThenGetWithMultipleThreads() throws Exception {
940    final int THREAD_NUM = 20;
941    final int ROUND_NUM = 10;
942    for (int round = 0; round < ROUND_NUM; round++) {
943      ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
944      final AtomicInteger successCnt = new AtomicInteger(0);
945      try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
946        TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
947
948        for (int i = 0; i < THREAD_NUM; i++) {
949          final int index = i;
950          Thread t = new Thread(new Runnable() {
951
952            @Override
953            public void run() {
954              final byte[] row = Bytes.toBytes("row-" + index);
955              final byte[] value = Bytes.toBytes("v" + index);
956              try {
957                Put put = new Put(row);
958                put.addColumn(FAMILY, QUALIFIER, value);
959                ht.put(put);
960                Get get = new Get(row);
961                Result result = ht.get(get);
962                byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
963                if (Bytes.equals(value, returnedValue)) {
964                  successCnt.getAndIncrement();
965                } else {
966                  LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
967                          + ", returned value: "
968                          + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
969                }
970              } catch (Throwable e) {
971                // do nothing
972              }
973            }
974          });
975          threads.add(t);
976        }
977        for (Thread t : threads) {
978          t.start();
979        }
980        for (Thread t : threads) {
981          t.join();
982        }
983        assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
984      }
985      TEST_UTIL.deleteTable(tableName);
986    }
987  }
988
989  private static void assertNoLocks(final TableName tableName)
990          throws IOException, InterruptedException {
991    HRegion region = (HRegion) find(tableName);
992    assertEquals(0, region.getLockedRows().size());
993  }
994  private static HRegion find(final TableName tableName)
995      throws IOException, InterruptedException {
996    HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
997    List<HRegion> regions = rs.getRegions(tableName);
998    assertEquals(1, regions.size());
999    return regions.get(0);
1000  }
1001
1002  private static <T extends RegionObserver> T find(final TableName tableName,
1003          Class<T> clz) throws IOException, InterruptedException {
1004    HRegion region = find(tableName);
1005    Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1006    assertTrue("The cp instance should be " + clz.getName()
1007            + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1008    return clz.cast(cp);
1009  }
1010
1011  public static class WaitingForMultiMutationsObserver
1012      implements RegionCoprocessor, RegionObserver {
1013    final CountDownLatch latch = new CountDownLatch(1);
1014
1015    @Override
1016    public Optional<RegionObserver> getRegionObserver() {
1017      return Optional.of(this);
1018    }
1019
1020    @Override
1021    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1022            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1023      try {
1024        latch.await();
1025      } catch (InterruptedException ex) {
1026        throw new IOException(ex);
1027      }
1028    }
1029  }
1030
1031  public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1032    private final CountDownLatch latch = new CountDownLatch(1);
1033
1034    @Override
1035    public Optional<RegionObserver> getRegionObserver() {
1036      return Optional.of(this);
1037    }
1038
1039    @Override
1040    public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1041            final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1042      try {
1043        // waiting for scanner
1044        latch.await();
1045      } catch (InterruptedException ex) {
1046        throw new IOException(ex);
1047      }
1048    }
1049
1050    @Override
1051    public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1052            final Scan scan, final RegionScanner s) throws IOException {
1053      latch.countDown();
1054      return s;
1055    }
1056  }
1057
1058  static byte[] generateHugeValue(int size) {
1059    Random rand = ThreadLocalRandom.current();
1060    byte[] value = new byte[size];
1061    for (int i = 0; i < value.length; i++) {
1062      value[i] = (byte) rand.nextInt(256);
1063    }
1064    return value;
1065  }
1066
1067  @Test
1068  public void testScanWithBatchSizeReturnIncompleteCells()
1069          throws IOException, InterruptedException {
1070    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1071      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1072      .build();
1073    try (Table table = TEST_UTIL.createTable(hd, null)) {
1074      TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1075
1076      Put put = new Put(ROW);
1077      put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1078      table.put(put);
1079
1080      put = new Put(ROW);
1081      put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1082      table.put(put);
1083
1084      for (int i = 2; i < 5; i++) {
1085        for (int version = 0; version < 2; version++) {
1086          put = new Put(ROW);
1087          put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1088          table.put(put);
1089        }
1090      }
1091
1092      Scan scan = new Scan();
1093      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1094              .setMaxResultSize(4 * 1024 * 1024);
1095      Result result;
1096      try (ResultScanner scanner = table.getScanner(scan)) {
1097        List<Result> list = new ArrayList<>();
1098        /*
1099         * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB;
1100         * The second scan rpc should return a result with 3 cells, because reach the batch limit
1101         * = 3;
1102         * The mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1103         * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1104         * into two result: 2+3 -> 3+2;
1105         */
1106        while ((result = scanner.next()) != null) {
1107          list.add(result);
1108        }
1109
1110        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1111        Assert.assertEquals(2, list.size());
1112        Assert.assertEquals(3, list.get(0).size());
1113        Assert.assertEquals(2, list.get(1).size());
1114      }
1115
1116      scan = new Scan();
1117      scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1118              .setMaxResultSize(4 * 1024 * 1024);
1119      try (ResultScanner scanner = table.getScanner(scan)) {
1120        List<Result> list = new ArrayList<>();
1121        while ((result = scanner.next()) != null) {
1122          list.add(result);
1123        }
1124        Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1125        Assert.assertEquals(3, list.size());
1126        Assert.assertEquals(2, list.get(0).size());
1127        Assert.assertEquals(2, list.get(1).size());
1128        Assert.assertEquals(1, list.get(2).size());
1129      }
1130    }
1131  }
1132}