1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor.example;
20
21 import java.io.IOException;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.NavigableSet;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.CoprocessorEnvironment;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.client.IsolationLevel;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
33 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
34 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
35 import org.apache.hadoop.hbase.regionserver.HStore;
36 import org.apache.hadoop.hbase.regionserver.Store;
37 import org.apache.hadoop.hbase.regionserver.ScanInfo;
38 import org.apache.hadoop.hbase.regionserver.InternalScanner;
39 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
40 import org.apache.hadoop.hbase.regionserver.ScanType;
41 import org.apache.hadoop.hbase.regionserver.Store;
42 import org.apache.hadoop.hbase.regionserver.StoreScanner;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45 import org.apache.zookeeper.KeeperException;
46 import org.apache.zookeeper.WatchedEvent;
47 import org.apache.zookeeper.Watcher;
48 import org.apache.zookeeper.ZooKeeper;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
68 public static final String node = "/backup/example/lastbackup";
69 public static final String zkkey = "ZK";
70 private static final Log LOG = LogFactory.getLog(ZooKeeperScanPolicyObserver.class);
71
72
73
74
75 private static class ZKWatcher implements Watcher {
76 private byte[] data = null;
77 private ZooKeeper zk;
78 private volatile boolean needSetup = true;
79 private volatile long lastSetupTry = 0;
80
81 public ZKWatcher(ZooKeeper zk) {
82 this.zk = zk;
83
84 getData();
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION")
99 public byte[] getData() {
100
101 if (needSetup && EnvironmentEdgeManager.currentTime() > lastSetupTry + 30000) {
102 synchronized (this) {
103
104 if (needSetup) {
105 needSetup = false;
106 } else {
107 return data;
108 }
109 }
110
111
112 try {
113 LOG.debug("Connecting to ZK");
114
115 lastSetupTry = EnvironmentEdgeManager.currentTime();
116 if (zk.exists(node, false) != null) {
117 data = zk.getData(node, this, null);
118 LOG.debug("Read synchronously: "+(data == null ? "null" : Bytes.toLong(data)));
119 } else {
120 zk.exists(node, this);
121 }
122 } catch (Exception x) {
123
124 needSetup = true;
125 }
126 }
127 return data;
128 }
129
130 @Override
131 public void process(WatchedEvent event) {
132 switch(event.getType()) {
133 case NodeDataChanged:
134 case NodeCreated:
135 try {
136
137 data = zk.getData(node, this, null);
138 LOG.debug("Read asynchronously: "+(data == null ? "null" : Bytes.toLong(data)));
139 } catch (InterruptedException ix) {
140 } catch (KeeperException kx) {
141 needSetup = true;
142 }
143 break;
144
145 case NodeDeleted:
146 try {
147
148 zk.exists(node, this);
149 data = null;
150 } catch (InterruptedException ix) {
151 } catch (KeeperException kx) {
152 needSetup = true;
153 }
154 break;
155
156 default:
157
158 }
159 }
160 }
161
162 @Override
163 public void start(CoprocessorEnvironment e) throws IOException {
164 RegionCoprocessorEnvironment re = (RegionCoprocessorEnvironment) e;
165 if (!re.getSharedData().containsKey(zkkey)) {
166
167
168 re.getSharedData().putIfAbsent(
169 zkkey,
170 new ZKWatcher(re.getRegionServerServices().getZooKeeper()
171 .getRecoverableZooKeeper().getZooKeeper()));
172 }
173 }
174
175 @Override
176 public void stop(CoprocessorEnvironment e) throws IOException {
177
178 }
179
180 protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) {
181 byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData();
182 if (data == null) {
183 return null;
184 }
185 ScanInfo oldSI = store.getScanInfo();
186 if (oldSI.getTtl() == Long.MAX_VALUE) {
187 return null;
188 }
189 long ttl = Math.max(EnvironmentEdgeManager.currentTime() -
190 Bytes.toLong(data), oldSI.getTtl());
191 return new ScanInfo(oldSI.getConfiguration(), store.getFamily(), ttl,
192 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
193 }
194
195 @Override
196 public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
197 Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
198 ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
199 if (scanInfo == null) {
200
201 return null;
202 }
203 Scan scan = new Scan();
204 scan.setMaxVersions(scanInfo.getMaxVersions());
205 return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
206 ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
207 }
208
209 @Override
210 public InternalScanner preCompactScannerOpen(
211 final ObserverContext<RegionCoprocessorEnvironment> c,
212 Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
213 InternalScanner s) throws IOException {
214 ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
215 if (scanInfo == null) {
216
217 return null;
218 }
219 Scan scan = new Scan();
220 scan.setMaxVersions(scanInfo.getMaxVersions());
221 return new StoreScanner(store, scanInfo, scan, scanners, scanType,
222 store.getSmallestReadPoint(), earliestPutTs);
223 }
224
225 @Override
226 public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
227 final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
228 final KeyValueScanner s) throws IOException {
229 ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
230 if (scanInfo == null) {
231
232 return null;
233 }
234 return new StoreScanner(store, scanInfo, scan, targetCols,
235 ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
236 }
237 }