001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.Collection;
024import java.util.List;
025import java.util.Optional;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.function.Predicate;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
043import org.apache.hadoop.hbase.coprocessor.ObserverContext;
044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.RegionObserver;
047import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
048import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
049import org.apache.hadoop.hbase.regionserver.InternalScanner;
050import org.apache.hadoop.hbase.regionserver.Region;
051import org.apache.hadoop.hbase.regionserver.ScanType;
052import org.apache.hadoop.hbase.regionserver.ScannerContext;
053import org.apache.hadoop.hbase.regionserver.Store;
054import org.apache.hadoop.hbase.regionserver.StoreScanner;
055import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.testclassification.MiscTests;
059import org.apache.hadoop.hbase.wal.WALEdit;
060import org.junit.AfterClass;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.junit.runner.RunWith;
066import org.junit.runners.Parameterized;
067import org.junit.runners.Parameterized.Parameters;
068
069@Category({ MiscTests.class, MediumTests.class })
070@RunWith(Parameterized.class)
071public class TestCoprocessorScanPolicy {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestCoprocessorScanPolicy.class);
076
077  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078  private static final byte[] F = Bytes.toBytes("fam");
079  private static final byte[] Q = Bytes.toBytes("qual");
080  private static final byte[] R = Bytes.toBytes("row");
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    Configuration conf = TEST_UTIL.getConfiguration();
085    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName());
086    TEST_UTIL.startMiniCluster();
087  }
088
089  @AfterClass
090  public static void tearDownAfterClass() throws Exception {
091    TEST_UTIL.shutdownMiniCluster();
092  }
093
094  @Parameters
095  public static Collection<Object[]> parameters() {
096    return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
097  }
098
099  public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
100    TEST_UTIL.getMiniHBaseCluster().getConf()
101      .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
102  }
103
104  @Test
105  public void testBaseCases() throws Exception {
106    TableName tableName = TableName.valueOf("baseCases");
107    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
108      TEST_UTIL.deleteTable(tableName);
109    }
110    Table t = TEST_UTIL.createTable(tableName, F, 10);
111    // insert 3 versions
112    long now = EnvironmentEdgeManager.currentTime();
113    Put p = new Put(R);
114    p.addColumn(F, Q, now, Q);
115    t.put(p);
116    p = new Put(R);
117    p.addColumn(F, Q, now + 1, Q);
118    t.put(p);
119    p = new Put(R);
120    p.addColumn(F, Q, now + 2, Q);
121    t.put(p);
122
123    Get g = new Get(R);
124    g.readVersions(10);
125    Result r = t.get(g);
126    assertEquals(3, r.size());
127
128    TEST_UTIL.flush(tableName);
129    TEST_UTIL.compact(tableName, true);
130
131    // still visible after a flush/compaction
132    r = t.get(g);
133    assertEquals(3, r.size());
134
135    // set the version override to 2
136    p = new Put(R);
137    p.setAttribute("versions", new byte[] {});
138    p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
139    t.put(p);
140
141    // only 2 versions now
142    r = t.get(g);
143    assertEquals(2, r.size());
144
145    TEST_UTIL.flush(tableName);
146    TEST_UTIL.compact(tableName, true);
147
148    // still 2 versions after a flush/compaction
149    r = t.get(g);
150    assertEquals(2, r.size());
151
152    // insert a new version
153    p.addColumn(F, Q, now + 3, Q);
154    t.put(p);
155
156    // still 2 versions
157    r = t.get(g);
158    assertEquals(2, r.size());
159
160    t.close();
161  }
162
163  @Test
164  public void testTTL() throws Exception {
165    TableName tableName = TableName.valueOf("testTTL");
166    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
167      TEST_UTIL.deleteTable(tableName);
168    }
169    Table t = TEST_UTIL.createTable(tableName, F, 10);
170    long now = EnvironmentEdgeManager.currentTime();
171    ManualEnvironmentEdge me = new ManualEnvironmentEdge();
172    me.setValue(now);
173    EnvironmentEdgeManagerTestHelper.injectEdge(me);
174    // 2s in the past
175    long ts = now - 2000;
176
177    Put p = new Put(R);
178    p.addColumn(F, Q, ts, Q);
179    t.put(p);
180    p = new Put(R);
181    p.addColumn(F, Q, ts + 1, Q);
182    t.put(p);
183
184    // Set the TTL override to 3s
185    p = new Put(R);
186    p.setAttribute("ttl", new byte[] {});
187    p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
188    t.put(p);
189    // these two should still be there
190    Get g = new Get(R);
191    g.readAllVersions();
192    Result r = t.get(g);
193    // still there?
194    assertEquals(2, r.size());
195
196    TEST_UTIL.flush(tableName);
197    TEST_UTIL.compact(tableName, true);
198
199    g = new Get(R);
200    g.readAllVersions();
201    r = t.get(g);
202    // still there?
203    assertEquals(2, r.size());
204
205    // roll time forward 2s.
206    me.setValue(now + 2000);
207    // now verify that data eventually does expire
208    g = new Get(R);
209    g.readAllVersions();
210    r = t.get(g);
211    // should be gone now
212    assertEquals(0, r.size());
213    t.close();
214    EnvironmentEdgeManager.reset();
215  }
216
217  public static class ScanObserver implements RegionCoprocessor, RegionObserver {
218    private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>();
219    private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>();
220
221    @Override
222    public Optional<RegionObserver> getRegionObserver() {
223      return Optional.of(this);
224    }
225
226    // lame way to communicate with the coprocessor,
227    // since it is loaded by a different class loader
228    @Override
229    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
230      final WALEdit edit, final Durability durability) throws IOException {
231      if (put.getAttribute("ttl") != null) {
232        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
233        ttls.put(
234          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
235            cell.getQualifierLength())),
236          Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
237        c.bypass();
238      } else if (put.getAttribute("versions") != null) {
239        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
240        versions.put(
241          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
242            cell.getQualifierLength())),
243          Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
244        c.bypass();
245      }
246    }
247
248    private InternalScanner wrap(Store store, InternalScanner scanner) {
249      Long ttl = this.ttls.get(store.getTableName());
250      Integer version = this.versions.get(store.getTableName());
251      return new DelegatingInternalScanner(scanner) {
252
253        private byte[] row;
254
255        private byte[] qualifier;
256
257        private int count;
258
259        private Predicate<Cell> checkTtl(long now, long ttl) {
260          return c -> now - c.getTimestamp() > ttl;
261        }
262
263        private Predicate<Cell> checkVersion(Cell firstCell, int version) {
264          if (version == 0) {
265            return c -> true;
266          } else {
267            if (row == null || !CellUtil.matchingRows(firstCell, row)) {
268              row = CellUtil.cloneRow(firstCell);
269              // reset qualifier as there is a row change
270              qualifier = null;
271            }
272            return c -> {
273              if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) {
274                if (count >= version) {
275                  return true;
276                }
277                count++;
278                return false;
279              } else { // qualifier switch
280                qualifier = CellUtil.cloneQualifier(c);
281                count = 1;
282                return false;
283              }
284            };
285          }
286        }
287
288        @Override
289        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
290          boolean moreRows = scanner.next(result, scannerContext);
291          if (result.isEmpty()) {
292            return moreRows;
293          }
294          long now = EnvironmentEdgeManager.currentTime();
295          Predicate<Cell> predicate = null;
296          if (ttl != null) {
297            predicate = checkTtl(now, ttl);
298          }
299          if (version != null) {
300            Predicate<Cell> vp = checkVersion(result.get(0), version);
301            if (predicate != null) {
302              predicate = predicate.and(vp);
303            } else {
304              predicate = vp;
305            }
306          }
307          if (predicate != null) {
308            result.removeIf(predicate);
309          }
310          return moreRows;
311        }
312      };
313    }
314
315    @Override
316    public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
317      InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
318      return wrap(store, scanner);
319    }
320
321    @Override
322    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
323      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
324      CompactionRequest request) throws IOException {
325      return wrap(store, scanner);
326    }
327
328    @Override
329    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
330      List<Cell> result) throws IOException {
331      TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName();
332      Long ttl = this.ttls.get(tableName);
333      if (ttl != null) {
334        get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax());
335      }
336      Integer version = this.versions.get(tableName);
337      if (version != null) {
338        get.readVersions(version);
339      }
340    }
341
342    @Override
343    public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
344      throws IOException {
345      Region region = c.getEnvironment().getRegion();
346      TableName tableName = region.getTableDescriptor().getTableName();
347      Long ttl = this.ttls.get(tableName);
348      if (ttl != null) {
349        scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax());
350      }
351      Integer version = this.versions.get(tableName);
352      if (version != null) {
353        scan.readVersions(version);
354      }
355    }
356  }
357}