001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.OptionalLong;
023import org.apache.hadoop.hbase.CoprocessorEnvironment;
024import org.apache.hadoop.hbase.coprocessor.ObserverContext;
025import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
026import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
027import org.apache.hadoop.hbase.coprocessor.RegionObserver;
028import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
029import org.apache.hadoop.hbase.regionserver.ScanOptions;
030import org.apache.hadoop.hbase.regionserver.ScanType;
031import org.apache.hadoop.hbase.regionserver.Store;
032import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
033import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.zookeeper.CreateMode;
038import org.apache.zookeeper.KeeperException;
039import org.apache.zookeeper.KeeperException.NodeExistsException;
040import org.apache.zookeeper.WatchedEvent;
041import org.apache.zookeeper.Watcher;
042import org.apache.zookeeper.ZooDefs;
043import org.apache.zookeeper.ZooKeeper;
044import org.apache.zookeeper.data.Stat;
045
046/**
047 * This is an example showing how a RegionObserver could be configured via ZooKeeper in order to
048 * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared
049 * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See
050 * {@link RegionCoprocessorEnvironment#getSharedData()}.
051 * <p>
052 * This would be useful for an incremental backup tool, which would indicate the last time of a
053 * successful backup via ZK and instruct HBase that to safely delete the data which has already been
054 * backup.
055 */
056@InterfaceAudience.Private
057public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver {
058
059  @Override
060  public Optional<RegionObserver> getRegionObserver() {
061    return Optional.of(this);
062  }
063
064  // The zk ensemble info is put in hbase config xml with given custom key.
065  public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble";
066  public static final String ZK_SESSION_TIMEOUT_KEY =
067    "ZooKeeperScanPolicyObserver.zookeeper.session.timeout";
068  public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs
069  public static final String NODE = "/backup/example/lastbackup";
070  private static final String ZKKEY = "ZK";
071
072  private ZKDataHolder cache;
073
074  /**
075   * Internal watcher that keep "data" up to date asynchronously.
076   */
077  private static final class ZKDataHolder implements Watcher {
078
079    private final String ensemble;
080
081    private final int sessionTimeout;
082
083    private ZooKeeper zk;
084
085    private int ref;
086
087    private byte[] data;
088
089    public ZKDataHolder(String ensemble, int sessionTimeout) {
090      this.ensemble = ensemble;
091      this.sessionTimeout = sessionTimeout;
092    }
093
094    private void open() throws IOException {
095      if (zk == null) {
096        zk = new ZooKeeper(ensemble, sessionTimeout, this);
097        // In a real application, you'd probably want to create these Znodes externally,
098        // and not from the coprocessor
099        StringBuffer createdPath = new StringBuffer();
100        byte[] empty = new byte[0];
101        for (String element : NODE.split("/")) {
102          if (element.isEmpty()) {
103            continue;
104          }
105          try {
106            createdPath = createdPath.append("/").append(element);
107            zk.create(createdPath.toString(), empty, ZooDefs.Ids.OPEN_ACL_UNSAFE,
108              CreateMode.PERSISTENT);
109          } catch (NodeExistsException e) {
110            // That's OK
111          } catch (KeeperException e) {
112            throw new IOException(e);
113          } catch (InterruptedException e) {
114            // Restore interrupt status
115            Thread.currentThread().interrupt();
116          }
117        }
118      }
119    }
120
121    private void close() {
122      if (zk != null) {
123        try {
124          zk.close();
125          zk = null;
126        } catch (InterruptedException e) {
127          // Restore interrupt status
128          Thread.currentThread().interrupt();
129        }
130      }
131    }
132
133    public synchronized byte[] getData() {
134      if (ref == 0) {
135        Stat stat = null;
136        try {
137          stat = zk.exists(NODE, this);
138        } catch (KeeperException e) {
139          // Value will always be null if the initial connection fails.
140          // In a real application you probably want to try to
141          // periodically re-connect in this case.
142        } catch (InterruptedException e) {
143          // Restore interrupt status
144          Thread.currentThread().interrupt();
145        }
146        if (stat != null) {
147          refresh();
148        }
149      }
150      ref++;
151      return data;
152    }
153
154    private synchronized void refresh() {
155      try {
156        data = zk.getData(NODE, this, null);
157      } catch (KeeperException e) {
158        // Value will always be null if this fails (as we cannot set the new watcher)
159        // In a real application you probably want to try to
160        // periodically re-connect in this case.
161      } catch (InterruptedException e) {
162        // Restore interrupt status
163        Thread.currentThread().interrupt();
164      }
165    }
166
167    @Override
168    public void process(WatchedEvent event) {
169      refresh();
170    }
171  }
172
173  @Override
174  public void start(CoprocessorEnvironment env) throws IOException {
175    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
176    try {
177      this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> {
178        String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY);
179        int sessionTimeout =
180          renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT);
181        return new ZKDataHolder(ensemble, sessionTimeout);
182      }));
183      cache.open();
184    } catch (Exception e) {
185      throw new IOException(e);
186    }
187  }
188
189  @Override
190  public void stop(CoprocessorEnvironment env) throws IOException {
191    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
192    this.cache = null;
193    ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).close();
194  }
195
196  private OptionalLong getExpireBefore() {
197    byte[] bytes = cache.getData();
198    if (bytes == null || bytes.length != Long.BYTES) {
199      return OptionalLong.empty();
200    }
201    return OptionalLong.of(Bytes.toLong(bytes));
202  }
203
204  private void resetTTL(ScanOptions options) {
205    OptionalLong expireBefore = getExpireBefore();
206    if (!expireBefore.isPresent()) {
207      return;
208    }
209    options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong());
210  }
211
212  @Override
213  public void preFlushScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c,
214    Store store, ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
215    resetTTL(options);
216  }
217
218  @Override
219  public void preCompactScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c,
220    Store store, ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
221    CompactionRequest request) throws IOException {
222    resetTTL(options);
223  }
224}