001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.function.Predicate;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CellUtil;
032import org.apache.hadoop.hbase.ExtendedCell;
033import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
034import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Durability;
038import org.apache.hadoop.hbase.client.Get;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
044import org.apache.hadoop.hbase.coprocessor.ObserverContext;
045import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.RegionObserver;
048import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
049import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
050import org.apache.hadoop.hbase.regionserver.InternalScanner;
051import org.apache.hadoop.hbase.regionserver.Region;
052import org.apache.hadoop.hbase.regionserver.ScanType;
053import org.apache.hadoop.hbase.regionserver.ScannerContext;
054import org.apache.hadoop.hbase.regionserver.Store;
055import org.apache.hadoop.hbase.regionserver.StoreScanner;
056import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
057import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
058import org.apache.hadoop.hbase.testclassification.MediumTests;
059import org.apache.hadoop.hbase.testclassification.MiscTests;
060import org.apache.hadoop.hbase.wal.WALEdit;
061import org.junit.jupiter.api.AfterAll;
062import org.junit.jupiter.api.BeforeAll;
063import org.junit.jupiter.api.Tag;
064import org.junit.jupiter.api.TestTemplate;
065import org.junit.jupiter.params.provider.Arguments;
066
067@Tag(MiscTests.TAG)
068@Tag(MediumTests.TAG)
069@HBaseParameterizedTestTemplate
070public class TestCoprocessorScanPolicy {
071
072  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
073  private static final byte[] F = Bytes.toBytes("fam");
074  private static final byte[] Q = Bytes.toBytes("qual");
075  private static final byte[] R = Bytes.toBytes("row");
076
077  @BeforeAll
078  public static void setUpBeforeClass() throws Exception {
079    Configuration conf = TEST_UTIL.getConfiguration();
080    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName());
081    TEST_UTIL.startMiniCluster();
082  }
083
084  @AfterAll
085  public static void tearDownAfterClass() throws Exception {
086    TEST_UTIL.shutdownMiniCluster();
087  }
088
089  public static Stream<Arguments> parameters() {
090    return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED.stream().map(arr -> Arguments.of(arr));
091  }
092
093  public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
094    TEST_UTIL.getMiniHBaseCluster().getConf()
095      .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
096  }
097
098  @TestTemplate
099  public void testBaseCases() throws Exception {
100    TableName tableName = TableName.valueOf("baseCases");
101    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
102      TEST_UTIL.deleteTable(tableName);
103    }
104    Table t = TEST_UTIL.createTable(tableName, F, 10);
105    // insert 3 versions
106    long now = EnvironmentEdgeManager.currentTime();
107    Put p = new Put(R);
108    p.addColumn(F, Q, now, Q);
109    t.put(p);
110    p = new Put(R);
111    p.addColumn(F, Q, now + 1, Q);
112    t.put(p);
113    p = new Put(R);
114    p.addColumn(F, Q, now + 2, Q);
115    t.put(p);
116
117    Get g = new Get(R);
118    g.readVersions(10);
119    Result r = t.get(g);
120    assertEquals(3, r.size());
121
122    TEST_UTIL.flush(tableName);
123    TEST_UTIL.compact(tableName, true);
124
125    // still visible after a flush/compaction
126    r = t.get(g);
127    assertEquals(3, r.size());
128
129    // set the version override to 2
130    p = new Put(R);
131    p.setAttribute("versions", new byte[] {});
132    p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
133    t.put(p);
134
135    // only 2 versions now
136    r = t.get(g);
137    assertEquals(2, r.size());
138
139    TEST_UTIL.flush(tableName);
140    TEST_UTIL.compact(tableName, true);
141
142    // still 2 versions after a flush/compaction
143    r = t.get(g);
144    assertEquals(2, r.size());
145
146    // insert a new version
147    p.addColumn(F, Q, now + 3, Q);
148    t.put(p);
149
150    // still 2 versions
151    r = t.get(g);
152    assertEquals(2, r.size());
153
154    t.close();
155  }
156
157  @TestTemplate
158  public void testTTL() throws Exception {
159    TableName tableName = TableName.valueOf("testTTL");
160    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
161      TEST_UTIL.deleteTable(tableName);
162    }
163    Table t = TEST_UTIL.createTable(tableName, F, 10);
164    long now = EnvironmentEdgeManager.currentTime();
165    ManualEnvironmentEdge me = new ManualEnvironmentEdge();
166    me.setValue(now);
167    EnvironmentEdgeManagerTestHelper.injectEdge(me);
168    // 2s in the past
169    long ts = now - 2000;
170
171    Put p = new Put(R);
172    p.addColumn(F, Q, ts, Q);
173    t.put(p);
174    p = new Put(R);
175    p.addColumn(F, Q, ts + 1, Q);
176    t.put(p);
177
178    // Set the TTL override to 3s
179    p = new Put(R);
180    p.setAttribute("ttl", new byte[] {});
181    p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
182    t.put(p);
183    // these two should still be there
184    Get g = new Get(R);
185    g.readAllVersions();
186    Result r = t.get(g);
187    // still there?
188    assertEquals(2, r.size());
189
190    TEST_UTIL.flush(tableName);
191    TEST_UTIL.compact(tableName, true);
192
193    g = new Get(R);
194    g.readAllVersions();
195    r = t.get(g);
196    // still there?
197    assertEquals(2, r.size());
198
199    // roll time forward 2s.
200    me.setValue(now + 2000);
201    // now verify that data eventually does expire
202    g = new Get(R);
203    g.readAllVersions();
204    r = t.get(g);
205    // should be gone now
206    assertEquals(0, r.size());
207    t.close();
208    EnvironmentEdgeManager.reset();
209  }
210
211  public static class ScanObserver implements RegionCoprocessor, RegionObserver {
212    private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>();
213    private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>();
214
215    @Override
216    public Optional<RegionObserver> getRegionObserver() {
217      return Optional.of(this);
218    }
219
220    // lame way to communicate with the coprocessor,
221    // since it is loaded by a different class loader
222    @Override
223    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> c,
224      final Put put, final WALEdit edit, final Durability durability) throws IOException {
225      if (put.getAttribute("ttl") != null) {
226        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
227        ttls.put(
228          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
229            cell.getQualifierLength())),
230          Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
231        c.bypass();
232      } else if (put.getAttribute("versions") != null) {
233        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
234        versions.put(
235          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
236            cell.getQualifierLength())),
237          Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
238        c.bypass();
239      }
240    }
241
242    private InternalScanner wrap(Store store, InternalScanner scanner) {
243      Long ttl = this.ttls.get(store.getTableName());
244      Integer version = this.versions.get(store.getTableName());
245      return new DelegatingInternalScanner(scanner) {
246
247        private byte[] row;
248
249        private byte[] qualifier;
250
251        private int count;
252
253        private Predicate<Cell> checkTtl(long now, long ttl) {
254          return c -> now - c.getTimestamp() > ttl;
255        }
256
257        private Predicate<Cell> checkVersion(Cell firstCell, int version) {
258          if (version == 0) {
259            return c -> true;
260          } else {
261            if (row == null || !CellUtil.matchingRows(firstCell, row)) {
262              row = CellUtil.cloneRow(firstCell);
263              // reset qualifier as there is a row change
264              qualifier = null;
265            }
266            return c -> {
267              if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) {
268                if (count >= version) {
269                  return true;
270                }
271                count++;
272                return false;
273              } else { // qualifier switch
274                qualifier = CellUtil.cloneQualifier(c);
275                count = 1;
276                return false;
277              }
278            };
279          }
280        }
281
282        @Override
283        public boolean next(List<? super ExtendedCell> result, ScannerContext scannerContext)
284          throws IOException {
285          boolean moreRows = scanner.next(result, scannerContext);
286          if (result.isEmpty()) {
287            return moreRows;
288          }
289          long now = EnvironmentEdgeManager.currentTime();
290          Predicate<Cell> predicate = null;
291          if (ttl != null) {
292            predicate = checkTtl(now, ttl);
293          }
294          if (version != null) {
295            Predicate<Cell> vp = checkVersion((Cell) result.get(0), version);
296            if (predicate != null) {
297              predicate = predicate.and(vp);
298            } else {
299              predicate = vp;
300            }
301          }
302          if (predicate != null) {
303            ((List<Cell>) result).removeIf(predicate);
304          }
305          return moreRows;
306        }
307      };
308    }
309
310    @Override
311    public InternalScanner preFlush(ObserverContext<? extends RegionCoprocessorEnvironment> c,
312      Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
313      return wrap(store, scanner);
314    }
315
316    @Override
317    public InternalScanner preCompact(ObserverContext<? extends RegionCoprocessorEnvironment> c,
318      Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
319      CompactionRequest request) throws IOException {
320      return wrap(store, scanner);
321    }
322
323    @Override
324    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get,
325      List<Cell> result) throws IOException {
326      TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName();
327      Long ttl = this.ttls.get(tableName);
328      if (ttl != null) {
329        get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax());
330      }
331      Integer version = this.versions.get(tableName);
332      if (version != null) {
333        get.readVersions(version);
334      }
335    }
336
337    @Override
338    public void preScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, Scan scan)
339      throws IOException {
340      Region region = c.getEnvironment().getRegion();
341      TableName tableName = region.getTableDescriptor().getTableName();
342      Long ttl = this.ttls.get(tableName);
343      if (ttl != null) {
344        scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax());
345      }
346      Integer version = this.versions.get(tableName);
347      if (version != null) {
348        scan.readVersions(version);
349      }
350    }
351  }
352}