001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import org.apache.hadoop.hbase.HBaseClassTestRule;
021import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
022import org.apache.hadoop.hbase.HBaseTestingUtil;
023import org.junit.ClassRule;
024
025// this is deliberately not in the o.a.h.h.regionserver package
026
027// in order to make sure all required classes/method are available
028
029import static org.junit.Assert.assertEquals;
030
031import java.io.IOException;
032import java.util.Collection;
033import java.util.List;
034import java.util.Optional;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.function.Predicate;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.Cell;
041import org.apache.hadoop.hbase.CellUtil;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Durability;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.Scan;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
050import org.apache.hadoop.hbase.coprocessor.ObserverContext;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.coprocessor.RegionObserver;
054import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
055import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
056import org.apache.hadoop.hbase.regionserver.InternalScanner;
057import org.apache.hadoop.hbase.regionserver.Region;
058import org.apache.hadoop.hbase.regionserver.ScanType;
059import org.apache.hadoop.hbase.regionserver.ScannerContext;
060import org.apache.hadoop.hbase.regionserver.Store;
061import org.apache.hadoop.hbase.regionserver.StoreScanner;
062import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
063import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
064import org.apache.hadoop.hbase.testclassification.MediumTests;
065import org.apache.hadoop.hbase.testclassification.MiscTests;
066import org.apache.hadoop.hbase.wal.WALEdit;
067import org.junit.AfterClass;
068import org.junit.BeforeClass;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.runner.RunWith;
072import org.junit.runners.Parameterized;
073import org.junit.runners.Parameterized.Parameters;
074
075@Category({ MiscTests.class, MediumTests.class })
076@RunWith(Parameterized.class)
077public class TestCoprocessorScanPolicy {
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081      HBaseClassTestRule.forClass(TestCoprocessorScanPolicy.class);
082
083  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
084  private static final byte[] F = Bytes.toBytes("fam");
085  private static final byte[] Q = Bytes.toBytes("qual");
086  private static final byte[] R = Bytes.toBytes("row");
087
088  @BeforeClass
089  public static void setUpBeforeClass() throws Exception {
090    Configuration conf = TEST_UTIL.getConfiguration();
091    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName());
092    TEST_UTIL.startMiniCluster();
093  }
094
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  @Parameters
101  public static Collection<Object[]> parameters() {
102    return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED;
103  }
104
105  public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
106    TEST_UTIL.getMiniHBaseCluster().getConf()
107        .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
108  }
109
110  @Test
111  public void testBaseCases() throws Exception {
112    TableName tableName = TableName.valueOf("baseCases");
113    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
114      TEST_UTIL.deleteTable(tableName);
115    }
116    Table t = TEST_UTIL.createTable(tableName, F, 10);
117    // insert 3 versions
118    long now = EnvironmentEdgeManager.currentTime();
119    Put p = new Put(R);
120    p.addColumn(F, Q, now, Q);
121    t.put(p);
122    p = new Put(R);
123    p.addColumn(F, Q, now + 1, Q);
124    t.put(p);
125    p = new Put(R);
126    p.addColumn(F, Q, now + 2, Q);
127    t.put(p);
128
129    Get g = new Get(R);
130    g.readVersions(10);
131    Result r = t.get(g);
132    assertEquals(3, r.size());
133
134    TEST_UTIL.flush(tableName);
135    TEST_UTIL.compact(tableName, true);
136
137    // still visible after a flush/compaction
138    r = t.get(g);
139    assertEquals(3, r.size());
140
141    // set the version override to 2
142    p = new Put(R);
143    p.setAttribute("versions", new byte[] {});
144    p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
145    t.put(p);
146
147    // only 2 versions now
148    r = t.get(g);
149    assertEquals(2, r.size());
150
151    TEST_UTIL.flush(tableName);
152    TEST_UTIL.compact(tableName, true);
153
154    // still 2 versions after a flush/compaction
155    r = t.get(g);
156    assertEquals(2, r.size());
157
158    // insert a new version
159    p.addColumn(F, Q, now + 3, Q);
160    t.put(p);
161
162    // still 2 versions
163    r = t.get(g);
164    assertEquals(2, r.size());
165
166    t.close();
167  }
168
169  @Test
170  public void testTTL() throws Exception {
171    TableName tableName = TableName.valueOf("testTTL");
172    if (TEST_UTIL.getAdmin().tableExists(tableName)) {
173      TEST_UTIL.deleteTable(tableName);
174    }
175    Table t = TEST_UTIL.createTable(tableName, F, 10);
176    long now = EnvironmentEdgeManager.currentTime();
177    ManualEnvironmentEdge me = new ManualEnvironmentEdge();
178    me.setValue(now);
179    EnvironmentEdgeManagerTestHelper.injectEdge(me);
180    // 2s in the past
181    long ts = now - 2000;
182
183    Put p = new Put(R);
184    p.addColumn(F, Q, ts, Q);
185    t.put(p);
186    p = new Put(R);
187    p.addColumn(F, Q, ts + 1, Q);
188    t.put(p);
189
190    // Set the TTL override to 3s
191    p = new Put(R);
192    p.setAttribute("ttl", new byte[] {});
193    p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
194    t.put(p);
195    // these two should still be there
196    Get g = new Get(R);
197    g.readAllVersions();
198    Result r = t.get(g);
199    // still there?
200    assertEquals(2, r.size());
201
202    TEST_UTIL.flush(tableName);
203    TEST_UTIL.compact(tableName, true);
204
205    g = new Get(R);
206    g.readAllVersions();
207    r = t.get(g);
208    // still there?
209    assertEquals(2, r.size());
210
211    // roll time forward 2s.
212    me.setValue(now + 2000);
213    // now verify that data eventually does expire
214    g = new Get(R);
215    g.readAllVersions();
216    r = t.get(g);
217    // should be gone now
218    assertEquals(0, r.size());
219    t.close();
220    EnvironmentEdgeManager.reset();
221  }
222
223  public static class ScanObserver implements RegionCoprocessor, RegionObserver {
224    private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>();
225    private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>();
226
227    @Override
228    public Optional<RegionObserver> getRegionObserver() {
229      return Optional.of(this);
230    }
231
232    // lame way to communicate with the coprocessor,
233    // since it is loaded by a different class loader
234    @Override
235    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
236        final WALEdit edit, final Durability durability) throws IOException {
237      if (put.getAttribute("ttl") != null) {
238        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
239        ttls.put(
240          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
241            cell.getQualifierLength())),
242          Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
243        c.bypass();
244      } else if (put.getAttribute("versions") != null) {
245        Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
246        versions.put(
247          TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
248            cell.getQualifierLength())),
249          Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
250        c.bypass();
251      }
252    }
253
254    private InternalScanner wrap(Store store, InternalScanner scanner) {
255      Long ttl = this.ttls.get(store.getTableName());
256      Integer version = this.versions.get(store.getTableName());
257      return new DelegatingInternalScanner(scanner) {
258
259        private byte[] row;
260
261        private byte[] qualifier;
262
263        private int count;
264
265        private Predicate<Cell> checkTtl(long now, long ttl) {
266          return c -> now - c.getTimestamp() > ttl;
267        }
268
269        private Predicate<Cell> checkVersion(Cell firstCell, int version) {
270          if (version == 0) {
271            return c -> true;
272          } else {
273            if (row == null || !CellUtil.matchingRows(firstCell, row)) {
274              row = CellUtil.cloneRow(firstCell);
275              // reset qualifier as there is a row change
276              qualifier = null;
277            }
278            return c -> {
279              if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) {
280                if (count >= version) {
281                  return true;
282                }
283                count++;
284                return false;
285              } else { // qualifier switch
286                qualifier = CellUtil.cloneQualifier(c);
287                count = 1;
288                return false;
289              }
290            };
291          }
292        }
293
294        @Override
295        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
296          boolean moreRows = scanner.next(result, scannerContext);
297          if (result.isEmpty()) {
298            return moreRows;
299          }
300          long now = EnvironmentEdgeManager.currentTime();
301          Predicate<Cell> predicate = null;
302          if (ttl != null) {
303            predicate = checkTtl(now, ttl);
304          }
305          if (version != null) {
306            Predicate<Cell> vp = checkVersion(result.get(0), version);
307            if (predicate != null) {
308              predicate = predicate.and(vp);
309            } else {
310              predicate = vp;
311            }
312          }
313          if (predicate != null) {
314            result.removeIf(predicate);
315          }
316          return moreRows;
317        }
318      };
319    }
320
321    @Override
322    public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
323        InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
324      return wrap(store, scanner);
325    }
326
327    @Override
328    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
329        InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
330        CompactionRequest request) throws IOException {
331      return wrap(store, scanner);
332    }
333
334    @Override
335    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
336        List<Cell> result) throws IOException {
337      TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName();
338      Long ttl = this.ttls.get(tableName);
339      if (ttl != null) {
340        get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax());
341      }
342      Integer version = this.versions.get(tableName);
343      if (version != null) {
344        get.readVersions(version);
345      }
346    }
347
348    @Override
349    public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
350        throws IOException {
351      Region region = c.getEnvironment().getRegion();
352      TableName tableName = region.getTableDescriptor().getTableName();
353      Long ttl = this.ttls.get(tableName);
354      if (ttl != null) {
355        scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax());
356      }
357      Integer version = this.versions.get(tableName);
358      if (version != null) {
359        scan.readVersions(version);
360      }
361    }
362  }
363}