001/** 002 * Copyright The Apache Software Foundation 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one or more 005 * contributor license agreements. See the NOTICE file distributed with this 006 * work for additional information regarding copyright ownership. The ASF 007 * licenses this file to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance with the License. 009 * You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 015 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 016 * License for the specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.hadoop.hbase.coprocessor.example; 020 021import java.io.IOException; 022import java.util.Optional; 023import java.util.OptionalLong; 024import org.apache.curator.framework.CuratorFramework; 025import org.apache.curator.framework.CuratorFrameworkFactory; 026import org.apache.curator.framework.recipes.cache.ChildData; 027import org.apache.curator.framework.recipes.cache.NodeCache; 028import org.apache.curator.retry.RetryForever; 029import org.apache.hadoop.hbase.CoprocessorEnvironment; 030import org.apache.hadoop.hbase.coprocessor.ObserverContext; 031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 033import org.apache.hadoop.hbase.coprocessor.RegionObserver; 034import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 035import org.apache.hadoop.hbase.regionserver.ScanOptions; 036import org.apache.hadoop.hbase.regionserver.ScanType; 037import org.apache.hadoop.hbase.regionserver.Store; 038import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 039import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043 044/** 045 * This is an example showing how a RegionObserver could configured via ZooKeeper in order to 046 * control a Region compaction, flush, and scan policy. This also demonstrated the use of shared 047 * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver} state. See 048 * {@link RegionCoprocessorEnvironment#getSharedData()}. 049 * <p> 050 * This would be useful for an incremental backup tool, which would indicate the last time of a 051 * successful backup via ZK and instruct HBase that to safely delete the data which has already been 052 * backup. 053 */ 054@InterfaceAudience.Private 055public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObserver { 056 057 @Override 058 public Optional<RegionObserver> getRegionObserver() { 059 return Optional.of(this); 060 } 061 062 // The zk ensemble info is put in hbase config xml with given custom key. 063 public static final String ZK_ENSEMBLE_KEY = "ZooKeeperScanPolicyObserver.zookeeper.ensemble"; 064 public static final String ZK_SESSION_TIMEOUT_KEY = 065 "ZooKeeperScanPolicyObserver.zookeeper.session.timeout"; 066 public static final int ZK_SESSION_TIMEOUT_DEFAULT = 30 * 1000; // 30 secs 067 public static final String NODE = "/backup/example/lastbackup"; 068 private static final String ZKKEY = "ZK"; 069 070 private NodeCache cache; 071 072 /** 073 * Internal watcher that keep "data" up to date asynchronously. 074 */ 075 private static final class ZKDataHolder { 076 077 private final String ensemble; 078 079 private final int sessionTimeout; 080 081 private CuratorFramework client; 082 083 private NodeCache cache; 084 085 private int ref; 086 087 public ZKDataHolder(String ensemble, int sessionTimeout) { 088 this.ensemble = ensemble; 089 this.sessionTimeout = sessionTimeout; 090 } 091 092 private void create() throws Exception { 093 client = 094 CuratorFrameworkFactory.builder().connectString(ensemble).sessionTimeoutMs(sessionTimeout) 095 .retryPolicy(new RetryForever(1000)).canBeReadOnly(true).build(); 096 client.start(); 097 cache = new NodeCache(client, NODE); 098 cache.start(true); 099 } 100 101 private void close() { 102 if (cache != null) { 103 try { 104 cache.close(); 105 } catch (IOException e) { 106 // should not happen 107 throw new AssertionError(e); 108 } 109 cache = null; 110 } 111 if (client != null) { 112 client.close(); 113 client = null; 114 } 115 } 116 117 public synchronized NodeCache acquire() throws Exception { 118 if (ref == 0) { 119 try { 120 create(); 121 } catch (Exception e) { 122 close(); 123 throw e; 124 } 125 } 126 ref++; 127 return cache; 128 } 129 130 public synchronized void release() { 131 ref--; 132 if (ref == 0) { 133 close(); 134 } 135 } 136 } 137 138 @Override 139 public void start(CoprocessorEnvironment env) throws IOException { 140 RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; 141 try { 142 this.cache = ((ZKDataHolder) renv.getSharedData().computeIfAbsent(ZKKEY, k -> { 143 String ensemble = renv.getConfiguration().get(ZK_ENSEMBLE_KEY); 144 int sessionTimeout = 145 renv.getConfiguration().getInt(ZK_SESSION_TIMEOUT_KEY, ZK_SESSION_TIMEOUT_DEFAULT); 146 return new ZKDataHolder(ensemble, sessionTimeout); 147 })).acquire(); 148 } catch (Exception e) { 149 throw new IOException(e); 150 } 151 } 152 153 @Override 154 public void stop(CoprocessorEnvironment env) throws IOException { 155 RegionCoprocessorEnvironment renv = (RegionCoprocessorEnvironment) env; 156 this.cache = null; 157 ((ZKDataHolder) renv.getSharedData().get(ZKKEY)).release(); 158 } 159 160 private OptionalLong getExpireBefore() { 161 ChildData data = cache.getCurrentData(); 162 if (data == null) { 163 return OptionalLong.empty(); 164 } 165 byte[] bytes = data.getData(); 166 if (bytes == null || bytes.length != Long.BYTES) { 167 return OptionalLong.empty(); 168 } 169 return OptionalLong.of(Bytes.toLong(bytes)); 170 } 171 172 private void resetTTL(ScanOptions options) { 173 OptionalLong expireBefore = getExpireBefore(); 174 if (!expireBefore.isPresent()) { 175 return; 176 } 177 options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong()); 178 } 179 180 @Override 181 public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 182 ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { 183 resetTTL(options); 184 } 185 186 @Override 187 public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 188 ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, 189 CompactionRequest request) throws IOException { 190 resetTTL(options); 191 } 192}