View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor.example;
19  
20  import java.io.IOException;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.NavigableSet;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
31  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
32  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
33  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
34  import org.apache.hadoop.hbase.regionserver.InternalScanner;
35  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
36  import org.apache.hadoop.hbase.regionserver.ScanType;
37  import org.apache.hadoop.hbase.regionserver.Store;
38  import org.apache.hadoop.hbase.regionserver.StoreScanner;
39  import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43  import org.apache.zookeeper.KeeperException;
44  import org.apache.zookeeper.WatchedEvent;
45  import org.apache.zookeeper.Watcher;
46  import org.apache.zookeeper.ZooKeeper;
47  
48  /**
49   * This is an example showing how a RegionObserver could configured
50   * via ZooKeeper in order to control a Region compaction, flush, and scan policy.
51   *
52   * This also demonstrated the use of shared {@link RegionObserver} state.
53   * See {@link RegionCoprocessorEnvironment#getSharedData()}.
54   *
55   * This would be useful for an incremental backup tool, which would indicate the last
56   * time of a successful backup via ZK and instruct HBase to not delete data that was
57   * inserted since (based on wall clock time). 
58   *
59   * This implements org.apache.zookeeper.Watcher directly instead of using
60   * {@link ZooKeeperWatcher}, because RegionObservers come and go and currently
61   * listeners registered with ZooKeeperWatcher cannot be removed.
62   */
63  public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
64    public static String node = "/backup/example/lastbackup";
65    public static String zkkey = "ZK";
66    private static final Log LOG = LogFactory.getLog(ZooKeeperScanPolicyObserver.class);
67  
68    /**
69     * Internal watcher that keep "data" up to date asynchronously.
70     */
71    private static class ZKWatcher implements Watcher {
72      private byte[] data = null;
73      private ZooKeeper zk;
74      private volatile boolean needSetup = true;
75      private volatile long lastSetupTry = 0;
76  
77      public ZKWatcher(ZooKeeper zk) {
78        this.zk = zk;
79        // trigger the listening
80        getData();
81      }
82  
83      /**
84       * Get the maintained data. In case of any ZK exceptions this will retry
85       * establishing the connection (but not more than twice/minute).
86       *
87       * getData is on the critical path, so make sure it is fast unless there is
88       * a problem (network partion, ZK ensemble down, etc)
89       * Make sure at most one (unlucky) thread retries and other threads don't pile up
90       * while that threads tries to recreate the connection.
91       *
92       * @return the last know version of the data
93       */
94      public byte[] getData() {
95        // try at most twice/minute
96        if (needSetup && EnvironmentEdgeManager.currentTimeMillis() > lastSetupTry + 30000) {
97          synchronized (this) {
98            // make sure only one thread tries to reconnect
99            if (needSetup) {
100             needSetup = false;
101           } else {
102             return data;
103           }
104         }
105         // do this without the lock held to avoid threads piling up on this lock,
106         // as it can take a while
107         try {
108           LOG.debug("Connecting to ZK");
109           // record this attempt
110           lastSetupTry = EnvironmentEdgeManager.currentTimeMillis();
111           if (zk.exists(node, false) != null) {
112             data = zk.getData(node, this, null);
113             LOG.debug("Read synchronously: "+(data == null ? "null" : Bytes.toLong(data)));
114           } else {
115             zk.exists(node, this);
116           }
117         } catch (Exception x) {
118           // try again if this fails
119           needSetup = true;
120         }
121       }
122       return data;
123     }
124 
125     @Override
126     public void process(WatchedEvent event) {
127       switch(event.getType()) {
128       case NodeDataChanged:
129       case NodeCreated:
130       try {
131         // get data and re-watch
132         data = zk.getData(node, this, null);
133         LOG.debug("Read asynchronously: "+(data == null ? "null" : Bytes.toLong(data)));
134       } catch (InterruptedException ix) {
135       } catch (KeeperException kx) {
136         needSetup = true;
137       }
138       break;
139 
140       case NodeDeleted:
141       try {
142         // just re-watch
143         zk.exists(node, this);
144         data = null;
145       } catch (InterruptedException ix) {
146       } catch (KeeperException kx) {
147         needSetup = true;
148       }
149       break;
150 
151       default:
152         // ignore
153       }
154     }
155   }
156 
157   @Override
158   public void start(CoprocessorEnvironment e) throws IOException {
159     RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e;
160     if (!re.getSharedData().containsKey(zkkey)) {
161       // there is a short race here
162       // in the worst case we create a watcher that will be notified once
163       re.getSharedData().putIfAbsent(
164           zkkey,
165           new ZKWatcher(re.getRegionServerServices().getZooKeeper()
166               .getRecoverableZooKeeper().getZooKeeper()));
167     }
168   }
169 
170   @Override
171   public void stop(CoprocessorEnvironment e) throws IOException {
172     // nothing to do here
173   }
174 
175   protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) {
176     byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData();
177     if (data == null) {
178       return null;
179     }
180     ScanInfo oldSI = store.getScanInfo();
181     if (oldSI.getTtl() == Long.MAX_VALUE) {
182       return null;
183     }
184     long ttl =  Math.max(EnvironmentEdgeManager.currentTimeMillis() - Bytes.toLong(data), oldSI.getTtl());    
185     return new ScanInfo(store.getFamily(), ttl,
186         oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
187   }
188 
189   @Override
190   public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
191       Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
192     Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
193     if (scanInfo == null) {
194       // take default action
195       return null;
196     }
197     Scan scan = new Scan();
198     scan.setMaxVersions(scanInfo.getMaxVersions());
199     return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
200         ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
201         HConstants.OLDEST_TIMESTAMP);
202   }
203 
204   @Override
205   public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
206       Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
207       InternalScanner s) throws IOException {
208     Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
209     if (scanInfo == null) {
210       // take default action
211       return null;
212     }
213     Scan scan = new Scan();
214     scan.setMaxVersions(scanInfo.getMaxVersions());
215     return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
216         .getSmallestReadPoint(), earliestPutTs);
217   }
218 
219   @Override
220   public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
221       final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
222       final KeyValueScanner s) throws IOException {
223     Store.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
224     if (scanInfo == null) {
225       // take default action
226       return null;
227     }
228     return new StoreScanner(store, scanInfo, scan, targetCols);
229   }
230 }