001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor.example; 019 020import java.io.IOException; 021import java.util.Optional; 022import java.util.OptionalLong; 023import org.apache.curator.framework.CuratorFramework; 024import org.apache.curator.framework.CuratorFrameworkFactory; 025import org.apache.curator.framework.recipes.cache.ChildData; 026import org.apache.curator.framework.recipes.cache.NodeCache; 027import org.apache.curator.retry.RetryForever; 028import org.apache.hadoop.hbase.CoprocessorEnvironment; 029import org.apache.hadoop.hbase.coprocessor.ObserverContext; 030import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 032import org.apache.hadoop.hbase.coprocessor.RegionObserver; 033import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 034import org.apache.hadoop.hbase.regionserver.ScanOptions; 035import org.apache.hadoop.hbase.regionserver.ScanType; 036import org.apache.hadoop.hbase.regionserver.Store; 037import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 038import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.yetus.audience.InterfaceAudience; 042 043/** 044 * This is an example showing how a RegionObserver could configured via ZooKeeper in order to 045 * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared 046 * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See 047 * {@link RegionCoprocessorEnvironment#getSharedData()}. 048 * <p> 049 * This would be useful for an incremental backup tool, which would indicate the last time of a 050 * successful backup via ZK and instruct HBase that to safely delete the data which has already been 051 * backup. 052 */ 053@InterfaceAudience.Private 054public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver { 055 056 @Override 057 public Optional<RegionObserver> getRegionObserver() { 058 return Optional.of(this); 059 } 060 061 // The zk ensemble info is put in hbase config xml with given custom key. 062 public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble"; 063 public static final String ZK_SESSION_TIMEOUT_KEY = 064 "ZooKeeperScanPolicyObserver.zookeeper.session.timeout"; 065 public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs 066 public static final String NODE = "/backup/example/lastbackup"; 067 private static final String ZKKEY = "ZK"; 068 069 private NodeCache cache; 070 071 /** 072 * Internal watcher that keep "data" up to date asynchronously. 073 */ 074 private static final class ZKDataHolder { 075 076 private final String ensemble; 077 078 private final int sessionTimeout; 079 080 private CuratorFramework client; 081 082 private NodeCache cache; 083 084 private int ref; 085 086 public ZKDataHolder(String ensemble, int sessionTimeout) { 087 this.ensemble = ensemble; 088 this.sessionTimeout = sessionTimeout; 089 } 090 091 private void create() throws Exception { 092 client = 093 CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout) 094 .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build(); 095 client.start(); 096 cache = new NodeCache(client, NODE); 097 cache.start(true); 098 } 099 100 private void close() { 101 if (cache != null) { 102 try { 103 cache.close(); 104 } catch (IOException e) { 105 // should not happen 106 throw new AssertionError(e); 107 } 108 cache = null; 109 } 110 if (client != null) { 111 client.close(); 112 client = null; 113 } 114 } 115 116 public synchronized NodeCache acquire() throws Exception { 117 if (ref == 0) { 118 try { 119 create(); 120 } catch (Exception e) { 121 close(); 122 throw e; 123 } 124 } 125 ref++; 126 return cache; 127 } 128 129 public synchronized void release() { 130 ref--; 131 if (ref == 0) { 132 close(); 133 } 134 } 135 } 136 137 @Override 138 public void start(CoprocessorEnvironment env) throws IOException { 139 RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; 140 try { 141 this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> { 142 String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY); 143 int sessionTimeout = 144 renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); 145 return new ZKDataHolder(ensemble, sessionTimeout); 146 })).acquire(); 147 } catch (Exception e) { 148 throw new IOException(e); 149 } 150 } 151 152 @Override 153 public void stop(CoprocessorEnvironment env) throws IOException { 154 RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; 155 this.cache = null; 156 ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).release(); 157 } 158 159 private OptionalLong getExpireBefore() { 160 ChildData data = cache.getCurrentData(); 161 if (data == null) { 162 return OptionalLong.empty(); 163 } 164 byte[] bytes = data.getData(); 165 if (bytes == null || bytes.length != Long.BYTES) { 166 return OptionalLong.empty(); 167 } 168 return OptionalLong.of(Bytes.toLong(bytes)); 169 } 170 171 private void resetTTL(ScanOptions options) { 172 OptionalLong expireBefore = getExpireBefore(); 173 if (!expireBefore.isPresent()) { 174 return; 175 } 176 options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong()); 177 } 178 179 @Override 180 public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 181 ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { 182 resetTTL(options); 183 } 184 185 @Override 186 public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 187 ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, 188 CompactionRequest request) throws IOException { 189 resetTTL(options); 190 } 191}