001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.Map;
028import java.util.NavigableMap;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Increment;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Row;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.io.TimeRange;
039import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
040import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.AfterEach;
047import org.junit.jupiter.api.BeforeAll;
048import org.junit.jupiter.api.BeforeEach;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051
052/**
053 * This test runs batch mutation with Increments which have custom TimeRange. Custom Observer
054 * records the TimeRange. We then verify that the recorded TimeRange has same bounds as the initial
055 * TimeRange. See HBASE-15698
056 */
057@Tag(CoprocessorTests.TAG)
058@Tag(MediumTests.TAG)
059public class TestIncrementTimeRange {
060
061  private static final HBaseTestingUtil util = new HBaseTestingUtil();
062  private static ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
063
064  private static final TableName TEST_TABLE = TableName.valueOf("test");
065  private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
066
067  private static final byte[] ROW_A = Bytes.toBytes("aaa");
068  private static final byte[] ROW_B = Bytes.toBytes("bbb");
069  private static final byte[] ROW_C = Bytes.toBytes("ccc");
070
071  private static final byte[] qualifierCol1 = Bytes.toBytes("col1");
072
073  private static final byte[] bytes1 = Bytes.toBytes(1);
074  private static final byte[] bytes2 = Bytes.toBytes(2);
075  private static final byte[] bytes3 = Bytes.toBytes(3);
076
077  private Table hTableInterface;
078  private Table table;
079
080  @BeforeAll
081  public static void setupBeforeClass() throws Exception {
082    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
083      MyObserver.class.getName());
084    // Make general delay zero rather than default. Timing is off in this
085    // test that depends on an evironment edge that is manually moved forward.
086    util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0);
087    util.startMiniCluster();
088    EnvironmentEdgeManager.injectEdge(mee);
089  }
090
091  @AfterAll
092  public static void tearDownAfterClass() throws Exception {
093    util.shutdownMiniCluster();
094  }
095
096  @BeforeEach
097  public void before() throws Exception {
098    table = util.createTable(TEST_TABLE, TEST_FAMILY);
099
100    Put puta = new Put(ROW_A);
101    puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1);
102    table.put(puta);
103
104    Put putb = new Put(ROW_B);
105    putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2);
106    table.put(putb);
107
108    Put putc = new Put(ROW_C);
109    putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3);
110    table.put(putc);
111  }
112
113  @AfterEach
114  public void after() throws Exception {
115    try {
116      if (table != null) {
117        table.close();
118      }
119    } finally {
120      try {
121        util.deleteTable(TEST_TABLE);
122      } catch (IOException ioe) {
123      }
124    }
125  }
126
127  public static class MyObserver extends SimpleRegionObserver {
128    static TimeRange tr10 = null, tr2 = null;
129
130    @Override
131    public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
132      final Increment increment) throws IOException {
133      NavigableMap<byte[], List<Cell>> map = increment.getFamilyCellMap();
134      for (Map.Entry<byte[], List<Cell>> entry : map.entrySet()) {
135        for (Cell cell : entry.getValue()) {
136          long incr =
137            Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
138          if (incr == 10) {
139            tr10 = increment.getTimeRange();
140          } else if (incr == 2 && !increment.getTimeRange().isAllTime()) {
141            tr2 = increment.getTimeRange();
142          }
143        }
144      }
145      return super.preIncrement(e, increment);
146    }
147  }
148
149  @Test
150  public void testHTableInterfaceMethods() throws Exception {
151    hTableInterface = util.getConnection().getTable(TEST_TABLE);
152    checkHTableInterfaceMethods();
153  }
154
155  private void checkHTableInterfaceMethods() throws Exception {
156    long time = EnvironmentEdgeManager.currentTime();
157    mee.setValue(time);
158    hTableInterface.put(new Put(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L)));
159    checkRowValue(ROW_A, Bytes.toBytes(1L));
160
161    time = EnvironmentEdgeManager.currentTime();
162    mee.setValue(time);
163    TimeRange range10 = TimeRange.between(1, time + 10);
164    hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 10L)
165      .setTimeRange(range10.getMin(), range10.getMax()));
166    checkRowValue(ROW_A, Bytes.toBytes(11L));
167    assertEquals(MyObserver.tr10.getMin(), range10.getMin());
168    assertEquals(MyObserver.tr10.getMax(), range10.getMax());
169
170    time = EnvironmentEdgeManager.currentTime();
171    mee.setValue(time);
172    TimeRange range2 = TimeRange.between(1, time + 20);
173    List<Row> actions = Arrays.asList(new Row[] {
174      new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L).setTimeRange(range2.getMin(),
175        range2.getMax()),
176      new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L).setTimeRange(range2.getMin(),
177        range2.getMax()) });
178    Object[] results3 = new Object[actions.size()];
179    Object[] results1 = results3;
180    hTableInterface.batch(actions, results1);
181    assertEquals(MyObserver.tr2.getMin(), range2.getMin());
182    assertEquals(MyObserver.tr2.getMax(), range2.getMax());
183    for (Object r2 : results1) {
184      assertTrue(r2 instanceof Result);
185    }
186    checkRowValue(ROW_A, Bytes.toBytes(15L));
187
188    hTableInterface.close();
189  }
190
191  private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException {
192    Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1);
193    Result result = hTableInterface.get(get);
194    byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1);
195    assertArrayEquals(expectedValue, actualValue);
196  }
197}