001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.zksyncer;
020
021import java.io.IOException;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.concurrent.ArrayBlockingQueue;
026import java.util.concurrent.BlockingQueue;
027
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.Server;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hadoop.hbase.zookeeper.ZKListener;
032import org.apache.hadoop.hbase.zookeeper.ZKUtil;
033import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.apache.zookeeper.CreateMode;
036import org.apache.zookeeper.KeeperException;
037
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
043 * changed
044 * <p/>
045 * The target znode(s) is given through {@link #getNodesToWatch()} method
046 */
047@InterfaceAudience.Private
048public abstract class ClientZKSyncer extends ZKListener {
049  private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
050  private final Server server;
051  private final ZKWatcher clientZkWatcher;
052  // We use queues and daemon threads to synchronize the data to client ZK cluster
053  // to avoid blocking the single event thread for watchers
054  private final Map<String, BlockingQueue<byte[]>> queues;
055
056  public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
057    super(watcher);
058    this.server = server;
059    this.clientZkWatcher = clientZkWatcher;
060    this.queues = new HashMap<>();
061  }
062
063  /**
064   * Starts the syncer
065   * @throws KeeperException if error occurs when trying to create base nodes on client ZK
066   */
067  public void start() throws KeeperException {
068    LOG.debug("Starting " + getClass().getSimpleName());
069    this.watcher.registerListener(this);
070    // create base znode on remote ZK
071    ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode);
072    // set meta znodes for client ZK
073    Collection<String> nodes = getNodesToWatch();
074    LOG.debug("Znodes to watch: " + nodes);
075    // initialize queues and threads
076    for (String node : nodes) {
077      BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1);
078      queues.put(node, queue);
079      Thread updater = new ClientZkUpdater(node, queue);
080      updater.setDaemon(true);
081      updater.start();
082      watchAndCheckExists(node);
083    }
084  }
085
086  private void watchAndCheckExists(String node) {
087    try {
088      if (ZKUtil.watchAndCheckExists(watcher, node)) {
089        byte[] data = ZKUtil.getDataAndWatch(watcher, node);
090        if (data != null) {
091          // put the data into queue
092          upsertQueue(node, data);
093        } else {
094          // It existed but now does not, should has been tracked by our watcher, ignore
095          LOG.debug("Found no data from " + node);
096          watchAndCheckExists(node);
097        }
098      } else {
099        // cleanup stale ZNodes on client ZK to avoid invalid requests to server
100        ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
101      }
102    } catch (KeeperException e) {
103      server.abort("Unexpected exception during initialization, aborting", e);
104    }
105  }
106
107  /**
108   * Update the value of the single element in queue if any, or else insert.
109   * <p/>
110   * We only need to synchronize the latest znode value to client ZK rather than synchronize each
111   * time
112   * @param data the data to write to queue
113   */
114  private void upsertQueue(String node, byte[] data) {
115    BlockingQueue<byte[]> queue = queues.get(node);
116    synchronized (queue) {
117      queue.poll();
118      queue.offer(data);
119    }
120  }
121
122  /**
123   * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when
124   * modifying this method
125   * @param node the znode to set on client ZK
126   * @param data the data to set to client ZK
127   * @throws InterruptedException if the thread is interrupted during process
128   */
129  private final void setDataForClientZkUntilSuccess(String node, byte[] data)
130      throws InterruptedException {
131    while (!server.isStopped()) {
132      try {
133        LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
134        ZKUtil.setData(clientZkWatcher, node, data);
135        break;
136      } catch (KeeperException.NoNodeException nne) {
137        // Node doesn't exist, create it and set value
138        try {
139          ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
140          break;
141        } catch (KeeperException.ConnectionLossException
142            | KeeperException.SessionExpiredException ee) {
143          reconnectAfterExpiration();
144        } catch (KeeperException e) {
145          LOG.warn(
146            "Failed to create znode " + node + " due to: " + e.getMessage() + ", will retry later");
147        }
148      } catch (KeeperException.ConnectionLossException
149          | KeeperException.SessionExpiredException ee) {
150        reconnectAfterExpiration();
151      } catch (KeeperException e) {
152        LOG.debug("Failed to set data to client ZK, will retry later", e);
153      }
154      Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
155    }
156  }
157
158  private final void reconnectAfterExpiration() throws InterruptedException {
159    LOG.warn("ZK session expired or lost. Retry a new connection...");
160    try {
161      clientZkWatcher.reconnectAfterExpiration();
162    } catch (IOException | KeeperException e) {
163      LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
164    }
165  }
166
167  @Override
168  public void nodeCreated(String path) {
169    if (!validate(path)) {
170      return;
171    }
172    try {
173      byte[] data = ZKUtil.getDataAndWatch(watcher, path);
174      upsertQueue(path, data);
175    } catch (KeeperException e) {
176      LOG.warn("Unexpected exception handling nodeCreated event", e);
177    }
178  }
179
180  @Override
181  public void nodeDataChanged(String path) {
182    if (validate(path)) {
183      nodeCreated(path);
184    }
185  }
186
187  @Override
188  public synchronized void nodeDeleted(String path) {
189    if (validate(path)) {
190      try {
191        if (ZKUtil.watchAndCheckExists(watcher, path)) {
192          nodeCreated(path);
193        }
194      } catch (KeeperException e) {
195        LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
196      }
197    }
198  }
199
200  /**
201   * Validate whether a znode path is watched by us
202   * @param path the path to validate
203   * @return true if the znode is watched by us
204   */
205  abstract boolean validate(String path);
206
207  /**
208   * @return the znode(s) to watch
209   */
210  abstract Collection<String> getNodesToWatch();
211
212  /**
213   * Thread to synchronize znode data to client ZK cluster
214   */
215  class ClientZkUpdater extends Thread {
216    final String znode;
217    final BlockingQueue<byte[]> queue;
218
219    public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) {
220      this.znode = znode;
221      this.queue = queue;
222      setName("ClientZKUpdater-" + znode);
223    }
224
225    @Override
226    public void run() {
227      while (!server.isStopped()) {
228        try {
229          byte[] data = queue.take();
230          setDataForClientZkUntilSuccess(znode, data);
231        } catch (InterruptedException e) {
232          if (LOG.isDebugEnabled()) {
233            LOG.debug(
234              "Interrupted while checking whether need to update meta location to client zk");
235          }
236          Thread.currentThread().interrupt();
237          break;
238        }
239      }
240    }
241  }
242}