001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.util.Optional; 022import java.util.OptionalLong; 023import org.apache.hadoop.hbase.CoprocessorEnvironment; 024import org.apache.hadoop.hbase.coprocessor.ObserverContext; 025import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 026import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 027import org.apache.hadoop.hbase.coprocessor.RegionObserver; 028import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 029import org.apache.hadoop.hbase.regionserver.ScanOptions; 030import org.apache.hadoop.hbase.regionserver.ScanType; 031import org.apache.hadoop.hbase.regionserver.Store; 032import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 033import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.zookeeper.CreateMode; 038import org.apache.zookeeper.KeeperException; 039import org.apache.zookeeper.KeeperException.NodeExistsException; 040import org.apache.zookeeper.WatchedEvent; 041import org.apache.zookeeper.Watcher; 042import org.apache.zookeeper.ZooDefs; 043import org.apache.zookeeper.ZooKeeper; 044import org.apache.zookeeper.data.Stat; 045 046/** 047 * This is an example showing how a RegionObserver could be configured via ZooKeeper in order to 048 * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared 049 * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See 050 * {@link RegionCoprocessorEnvironment#getSharedData()}. 051 * <p> 052 * This would be useful for an incremental backup tool, which would indicate the last time of a 053 * successful backup via ZK and instruct HBase that to safely delete the data which has already been 054 * backup. 055 */ 056@InterfaceAudience.Private 057public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver { 058 059 @Override 060 public Optional<RegionObserver> getRegionObserver() { 061 return Optional.of(this); 062 } 063 064 // The zk ensemble info is put in hbase config xml with given custom key. 065 public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble"; 066 public static final String ZK_SESSION_TIMEOUT_KEY = 067 "ZooKeeperScanPolicyObserver.zookeeper.session.timeout"; 068 public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs 069 public static final String NODE = "/backup/example/lastbackup"; 070 private static final String ZKKEY = "ZK"; 071 072 private ZKDataHolder cache; 073 074 /** 075 * Internal watcher that keep "data" up to date asynchronously. 076 */ 077 private static final class ZKDataHolder implements Watcher { 078 079 private final String ensemble; 080 081 private final int sessionTimeout; 082 083 private ZooKeeper zk; 084 085 private int ref; 086 087 private byte[] data; 088 089 public ZKDataHolder(String ensemble, int sessionTimeout) { 090 this.ensemble = ensemble; 091 this.sessionTimeout = sessionTimeout; 092 } 093 094 private void open() throws IOException { 095 if (zk == null) { 096 zk = new ZooKeeper(ensemble, sessionTimeout, this); 097 // In a real application, you'd probably want to create these Znodes externally, 098 // and not from the coprocessor 099 StringBuffer createdPath = new StringBuffer(); 100 byte[] empty = new byte[0]; 101 for (String element : NODE.split("/")) { 102 if (element.isEmpty()) { 103 continue; 104 } 105 try { 106 createdPath = createdPath.append("/").append(element); 107 zk.create(createdPath.toString(), empty, ZooDefs.Ids.OPEN_ACL_UNSAFE, 108 CreateMode.PERSISTENT); 109 } catch (NodeExistsException e) { 110 // That's OK 111 } catch (KeeperException e) { 112 throw new IOException(e); 113 } catch (InterruptedException e) { 114 // Restore interrupt status 115 Thread.currentThread().interrupt(); 116 } 117 } 118 } 119 } 120 121 private void close() { 122 if (zk != null) { 123 try { 124 zk.close(); 125 zk = null; 126 } catch (InterruptedException e) { 127 // Restore interrupt status 128 Thread.currentThread().interrupt(); 129 } 130 } 131 } 132 133 public synchronized byte[] getData() { 134 if (ref == 0) { 135 Stat stat = null; 136 try { 137 stat = zk.exists(NODE, this); 138 } catch (KeeperException e) { 139 // Value will always be null if the initial connection fails. 140 // In a real application you probably want to try to 141 // periodically re-connect in this case. 142 } catch (InterruptedException e) { 143 // Restore interrupt status 144 Thread.currentThread().interrupt(); 145 } 146 if (stat != null) { 147 refresh(); 148 } 149 } 150 ref++; 151 return data; 152 } 153 154 private synchronized void refresh() { 155 try { 156 data = zk.getData(NODE, this, null); 157 } catch (KeeperException e) { 158 // Value will always be null if this fails (as we cannot set the new watcher) 159 // In a real application you probably want to try to 160 // periodically re-connect in this case. 161 } catch (InterruptedException e) { 162 // Restore interrupt status 163 Thread.currentThread().interrupt(); 164 } 165 } 166 167 @Override 168 public void process(WatchedEvent event) { 169 refresh(); 170 } 171 } 172 173 @Override 174 public void start(CoprocessorEnvironment env) throws IOException { 175 RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; 176 try { 177 this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> { 178 String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY); 179 int sessionTimeout = 180 renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); 181 return new ZKDataHolder(ensemble, sessionTimeout); 182 })); 183 cache.open(); 184 } catch (Exception e) { 185 throw new IOException(e); 186 } 187 } 188 189 @Override 190 public void stop(CoprocessorEnvironment env) throws IOException { 191 RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; 192 this.cache = null; 193 ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).close(); 194 } 195 196 private OptionalLong getExpireBefore() { 197 byte[] bytes = cache.getData(); 198 if (bytes == null || bytes.length != Long.BYTES) { 199 return OptionalLong.empty(); 200 } 201 return OptionalLong.of(Bytes.toLong(bytes)); 202 } 203 204 private void resetTTL(ScanOptions options) { 205 OptionalLong expireBefore = getExpireBefore(); 206 if (!expireBefore.isPresent()) { 207 return; 208 } 209 options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong()); 210 } 211 212 @Override 213 public void preFlushScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, 214 Store store, ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { 215 resetTTL(options); 216 } 217 218 @Override 219 public void preCompactScannerOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c, 220 Store store, ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, 221 CompactionRequest request) throws IOException { 222 resetTTL(options); 223 } 224}