001/**
002 * Copyright The Apache Software Foundation
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one or more
005 * contributor license agreements. See the NOTICE file distributed with this
006 * work for additional information regarding copyright ownership. The ASF
007 * licenses this file to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance with the License.
009 * You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
016 * License for the specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.hadoop.hbase.coprocessor.example;
020
021import java.io.IOException;
022import java.util.Optional;
023import java.util.OptionalLong;
024import org.apache.curator.framework.CuratorFramework;
025import org.apache.curator.framework.CuratorFrameworkFactory;
026import org.apache.curator.framework.recipes.cache.ChildData;
027import org.apache.curator.framework.recipes.cache.NodeCache;
028import org.apache.curator.retry.RetryForever;
029import org.apache.hadoop.hbase.CoprocessorEnvironment;
030import org.apache.hadoop.hbase.coprocessor.ObserverContext;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionObserver;
034import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
035import org.apache.hadoop.hbase.regionserver.ScanOptions;
036import org.apache.hadoop.hbase.regionserver.ScanType;
037import org.apache.hadoop.hbase.regionserver.Store;
038import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
039import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043
044/**
045 * This is an example showing how a RegionObserver could configured via ZooKeeper in order to
046 * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared
047 * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See
048 * {@link RegionCoprocessorEnvironment#getSharedData()}.
049 * <p>
050 * This would be useful for an incremental backup tool, which would indicate the last time of a
051 * successful backup via ZK and instruct HBase that to safely delete the data which has already been
052 * backup.
053 */
054@InterfaceAudience.Private
055public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver {
056
057  @Override
058  public Optional<RegionObserver> getRegionObserver() {
059    return Optional.of(this);
060  }
061
062  // The zk ensemble info is put in hbase config xml with given custom key.
063  public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble";
064  public static final String ZK_SESSION_TIMEOUT_KEY =
065      "ZooKeeperScanPolicyObserver.zookeeper.session.timeout";
066  public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs
067  public static final String NODE = "/backup/example/lastbackup";
068  private static final String ZKKEY = "ZK";
069
070  private NodeCache cache;
071
072  /**
073   * Internal watcher that keep "data" up to date asynchronously.
074   */
075  private static final class ZKDataHolder {
076
077    private final String ensemble;
078
079    private final int sessionTimeout;
080
081    private CuratorFramework client;
082
083    private NodeCache cache;
084
085    private int ref;
086
087    public ZKDataHolder(String ensemble, int sessionTimeout) {
088      this.ensemble = ensemble;
089      this.sessionTimeout = sessionTimeout;
090    }
091
092    private void create() throws Exception {
093      client =
094          CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout)
095              .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build();
096      client.start();
097      cache = new NodeCache(client, NODE);
098      cache.start(true);
099    }
100
101    private void close() {
102      if (cache != null) {
103        try {
104          cache.close();
105        } catch (IOException e) {
106          // should not happen
107          throw new AssertionError(e);
108        }
109        cache = null;
110      }
111      if (client != null) {
112        client.close();
113        client = null;
114      }
115    }
116
117    public synchronized NodeCache acquire() throws Exception {
118      if (ref == 0) {
119        try {
120          create();
121        } catch (Exception e) {
122          close();
123          throw e;
124        }
125      }
126      ref++;
127      return cache;
128    }
129
130    public synchronized void release() {
131      ref--;
132      if (ref == 0) {
133        close();
134      }
135    }
136  }
137
138  @Override
139  public void start(CoprocessorEnvironment env) throws IOException {
140    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
141    try {
142      this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> {
143        String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY);
144        int sessionTimeout =
145            renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT);
146        return new ZKDataHolder(ensemble, sessionTimeout);
147      })).acquire();
148    } catch (Exception e) {
149      throw new IOException(e);
150    }
151  }
152
153  @Override
154  public void stop(CoprocessorEnvironment env) throws IOException {
155    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
156    this.cache = null;
157    ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).release();
158  }
159
160  private OptionalLong getExpireBefore() {
161    ChildData data = cache.getCurrentData();
162    if (data == null) {
163      return OptionalLong.empty();
164    }
165    byte[] bytes = data.getData();
166    if (bytes == null || bytes.length != Long.BYTES) {
167      return OptionalLong.empty();
168    }
169    return OptionalLong.of(Bytes.toLong(bytes));
170  }
171
172  private void resetTTL(ScanOptions options) {
173    OptionalLong expireBefore = getExpireBefore();
174    if (!expireBefore.isPresent()) {
175      return;
176    }
177    options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong());
178  }
179
180  @Override
181  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
182      ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
183    resetTTL(options);
184  }
185
186  @Override
187  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
188      ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
189      CompactionRequest request) throws IOException {
190    resetTTL(options);
191  }
192}