001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor.example;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.OptionalLong;
023import org.apache.curator.framework.CuratorFramework;
024import org.apache.curator.framework.CuratorFrameworkFactory;
025import org.apache.curator.framework.recipes.cache.ChildData;
026import org.apache.curator.framework.recipes.cache.NodeCache;
027import org.apache.curator.retry.RetryForever;
028import org.apache.hadoop.hbase.CoprocessorEnvironment;
029import org.apache.hadoop.hbase.coprocessor.ObserverContext;
030import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
032import org.apache.hadoop.hbase.coprocessor.RegionObserver;
033import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
034import org.apache.hadoop.hbase.regionserver.ScanOptions;
035import org.apache.hadoop.hbase.regionserver.ScanType;
036import org.apache.hadoop.hbase.regionserver.Store;
037import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
038import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.yetus.audience.InterfaceAudience;
042
043/**
044 * This is an example showing how a RegionObserver could configured via ZooKeeper in order to
045 * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared
046 * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See
047 * {@link RegionCoprocessorEnvironment#getSharedData()}.
048 * <p>
049 * This would be useful for an incremental backup tool, which would indicate the last time of a
050 * successful backup via ZK and instruct HBase that to safely delete the data which has already been
051 * backup.
052 */
053@InterfaceAudience.Private
054public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver {
055
056  @Override
057  public Optional<RegionObserver> getRegionObserver() {
058    return Optional.of(this);
059  }
060
061  // The zk ensemble info is put in hbase config xml with given custom key.
062  public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble";
063  public static final String ZK_SESSION_TIMEOUT_KEY =
064    "ZooKeeperScanPolicyObserver.zookeeper.session.timeout";
065  public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs
066  public static final String NODE = "/backup/example/lastbackup";
067  private static final String ZKKEY = "ZK";
068
069  private NodeCache cache;
070
071  /**
072   * Internal watcher that keep "data" up to date asynchronously.
073   */
074  private static final class ZKDataHolder {
075
076    private final String ensemble;
077
078    private final int sessionTimeout;
079
080    private CuratorFramework client;
081
082    private NodeCache cache;
083
084    private int ref;
085
086    public ZKDataHolder(String ensemble, int sessionTimeout) {
087      this.ensemble = ensemble;
088      this.sessionTimeout = sessionTimeout;
089    }
090
091    private void create() throws Exception {
092      client =
093        CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout)
094          .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build();
095      client.start();
096      cache = new NodeCache(client, NODE);
097      cache.start(true);
098    }
099
100    private void close() {
101      if (cache != null) {
102        try {
103          cache.close();
104        } catch (IOException e) {
105          // should not happen
106          throw new AssertionError(e);
107        }
108        cache = null;
109      }
110      if (client != null) {
111        client.close();
112        client = null;
113      }
114    }
115
116    public synchronized NodeCache acquire() throws Exception {
117      if (ref == 0) {
118        try {
119          create();
120        } catch (Exception e) {
121          close();
122          throw e;
123        }
124      }
125      ref++;
126      return cache;
127    }
128
129    public synchronized void release() {
130      ref--;
131      if (ref == 0) {
132        close();
133      }
134    }
135  }
136
137  @Override
138  public void start(CoprocessorEnvironment env) throws IOException {
139    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
140    try {
141      this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> {
142        String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY);
143        int sessionTimeout =
144          renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT);
145        return new ZKDataHolder(ensemble, sessionTimeout);
146      })).acquire();
147    } catch (Exception e) {
148      throw new IOException(e);
149    }
150  }
151
152  @Override
153  public void stop(CoprocessorEnvironment env) throws IOException {
154    RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env;
155    this.cache = null;
156    ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).release();
157  }
158
159  private OptionalLong getExpireBefore() {
160    ChildData data = cache.getCurrentData();
161    if (data == null) {
162      return OptionalLong.empty();
163    }
164    byte[] bytes = data.getData();
165    if (bytes == null || bytes.length != Long.BYTES) {
166      return OptionalLong.empty();
167    }
168    return OptionalLong.of(Bytes.toLong(bytes));
169  }
170
171  private void resetTTL(ScanOptions options) {
172    OptionalLong expireBefore = getExpireBefore();
173    if (!expireBefore.isPresent()) {
174      return;
175    }
176    options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong());
177  }
178
179  @Override
180  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
181    ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
182    resetTTL(options);
183  }
184
185  @Override
186  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
187    ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
188    CompactionRequest request) throws IOException {
189    resetTTL(options);
190  }
191}