001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.zksyncer;
019
020import java.io.IOException;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.Server;
028import org.apache.hadoop.hbase.util.Threads;
029import org.apache.hadoop.hbase.zookeeper.ZKListener;
030import org.apache.hadoop.hbase.zookeeper.ZKUtil;
031import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.apache.zookeeper.CreateMode;
034import org.apache.zookeeper.KeeperException;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
040 * changed
041 * <p/>
042 * The target znode(s) is given through {@link #getPathsToWatch()} method
043 */
044@InterfaceAudience.Private
045public abstract class ClientZKSyncer extends ZKListener {
046  private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
047  private final Server server;
048  private final ZKWatcher clientZkWatcher;
049
050  /**
051   * Used to store the newest data which we want to sync to client zk.
052   * <p/>
053   * For meta location, since we may reduce the replica number, so here we add a {@code delete} flag
054   * to tell the updater delete the znode on client zk and quit.
055   */
056  private static final class ZKData {
057
058    byte[] data;
059
060    boolean delete = false;
061
062    synchronized void set(byte[] data) {
063      this.data = data;
064      notifyAll();
065    }
066
067    synchronized byte[] get() throws InterruptedException {
068      while (!delete && data == null) {
069        wait();
070      }
071      byte[] d = data;
072      data = null;
073      return d;
074    }
075
076    synchronized void delete() {
077      this.delete = true;
078      notifyAll();
079    }
080
081    synchronized boolean isDeleted() {
082      return delete;
083    }
084  }
085
086  // We use queues and daemon threads to synchronize the data to client ZK cluster
087  // to avoid blocking the single event thread for watchers
088  private final ConcurrentMap<String, ZKData> queues;
089
090  public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
091    super(watcher);
092    this.server = server;
093    this.clientZkWatcher = clientZkWatcher;
094    this.queues = new ConcurrentHashMap<>();
095  }
096
097  private void startNewSyncThread(String path) {
098    ZKData zkData = new ZKData();
099    queues.put(path, zkData);
100    Thread updater = new ClientZkUpdater(path, zkData);
101    updater.setDaemon(true);
102    updater.start();
103    watchAndCheckExists(path);
104  }
105
106  /**
107   * Starts the syncer
108   * @throws KeeperException if error occurs when trying to create base nodes on client ZK
109   */
110  public void start() throws KeeperException {
111    LOG.debug("Starting " + getClass().getSimpleName());
112    this.watcher.registerListener(this);
113    // create base znode on remote ZK
114    ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode);
115    // set znodes for client ZK
116    Set<String> paths = getPathsToWatch();
117    LOG.debug("ZNodes to watch: {}", paths);
118    // initialize queues and threads
119    for (String path : paths) {
120      startNewSyncThread(path);
121    }
122  }
123
124  private void watchAndCheckExists(String node) {
125    try {
126      if (ZKUtil.watchAndCheckExists(watcher, node)) {
127        byte[] data = ZKUtil.getDataAndWatch(watcher, node);
128        if (data != null) {
129          // put the data into queue
130          upsertQueue(node, data);
131        } else {
132          // It existed but now does not, should has been tracked by our watcher, ignore
133          LOG.debug("Found no data from " + node);
134          watchAndCheckExists(node);
135        }
136      } else {
137        // cleanup stale ZNodes on client ZK to avoid invalid requests to server
138        ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
139      }
140    } catch (KeeperException e) {
141      server.abort("Unexpected exception during initialization, aborting", e);
142    }
143  }
144
145  /**
146   * Update the value of the single element in queue if any, or else insert.
147   * <p/>
148   * We only need to synchronize the latest znode value to client ZK rather than synchronize each
149   * time
150   * @param data the data to write to queue
151   */
152  private void upsertQueue(String node, byte[] data) {
153    ZKData zkData = queues.get(node);
154    if (zkData != null) {
155      zkData.set(data);
156    }
157  }
158
159  /**
160   * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when
161   * modifying this method
162   * @param node the znode to set on client ZK
163   * @param data the data to set to client ZK
164   * @throws InterruptedException if the thread is interrupted during process
165   */
166  private void setDataForClientZkUntilSuccess(String node, byte[] data)
167    throws InterruptedException {
168    boolean create = false;
169    while (!server.isStopped()) {
170      try {
171        LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
172        if (create) {
173          ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
174        } else {
175          ZKUtil.setData(clientZkWatcher, node, data);
176        }
177        break;
178      } catch (KeeperException e) {
179        LOG.debug("Failed to set data for {} to client ZK, will retry later", node, e);
180        if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
181          reconnectAfterExpiration();
182        }
183        if (e.code() == KeeperException.Code.NONODE) {
184          create = true;
185        }
186        if (e.code() == KeeperException.Code.NODEEXISTS) {
187          create = false;
188        }
189      }
190      Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
191    }
192  }
193
194  private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException {
195    while (!server.isStopped()) {
196      LOG.debug("Delete remote " + node + ", client zk wather: " + clientZkWatcher);
197      try {
198        ZKUtil.deleteNode(clientZkWatcher, node);
199        break;
200      } catch (KeeperException e) {
201        if (e.code() == KeeperException.Code.NONODE) {
202          LOG.debug("Node is already deleted, give up", e);
203          break;
204        }
205        LOG.debug("Failed to delete node from client ZK, will retry later", e);
206        if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
207          reconnectAfterExpiration();
208        }
209      }
210    }
211  }
212
213  private final void reconnectAfterExpiration() throws InterruptedException {
214    LOG.warn("ZK session expired or lost. Retry a new connection...");
215    try {
216      clientZkWatcher.reconnectAfterExpiration();
217    } catch (IOException | KeeperException e) {
218      LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
219    }
220  }
221
222  private void getDataAndWatch(String path) {
223    try {
224      byte[] data = ZKUtil.getDataAndWatch(watcher, path);
225      upsertQueue(path, data);
226    } catch (KeeperException e) {
227      LOG.warn("Unexpected exception handling nodeCreated event", e);
228    }
229  }
230
231  private void removeQueue(String path) {
232    ZKData zkData = queues.remove(path);
233    if (zkData != null) {
234      zkData.delete();
235    }
236  }
237
238  @Override
239  public void nodeCreated(String path) {
240    if (validate(path)) {
241      getDataAndWatch(path);
242    } else {
243      removeQueue(path);
244    }
245  }
246
247  @Override
248  public void nodeDataChanged(String path) {
249    nodeCreated(path);
250  }
251
252  @Override
253  public synchronized void nodeDeleted(String path) {
254    if (validate(path)) {
255      try {
256        if (ZKUtil.watchAndCheckExists(watcher, path)) {
257          getDataAndWatch(path);
258        }
259      } catch (KeeperException e) {
260        LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
261      }
262    } else {
263      removeQueue(path);
264    }
265  }
266
267  /**
268   * Validate whether a znode path is watched by us
269   * @param path the path to validate
270   * @return true if the znode is watched by us
271   */
272  protected abstract boolean validate(String path);
273
274  /** Returns the zk path(s) to watch */
275  protected abstract Set<String> getPathsToWatch();
276
277  protected final void refreshWatchingList() {
278    Set<String> newPaths = getPathsToWatch();
279    LOG.debug("New ZNodes to watch: {}", newPaths);
280    Iterator<Map.Entry<String, ZKData>> iter = queues.entrySet().iterator();
281    // stop unused syncers
282    while (iter.hasNext()) {
283      Map.Entry<String, ZKData> entry = iter.next();
284      if (!newPaths.contains(entry.getKey())) {
285        iter.remove();
286        entry.getValue().delete();
287      }
288    }
289    // start new syncers
290    for (String newPath : newPaths) {
291      if (!queues.containsKey(newPath)) {
292        startNewSyncThread(newPath);
293      }
294    }
295  }
296
297  /**
298   * Thread to synchronize znode data to client ZK cluster
299   */
300  private final class ClientZkUpdater extends Thread {
301    private final String znode;
302    private final ZKData zkData;
303
304    public ClientZkUpdater(String znode, ZKData zkData) {
305      this.znode = znode;
306      this.zkData = zkData;
307      setName("ClientZKUpdater-" + znode);
308    }
309
310    @Override
311    public void run() {
312      LOG.debug("Client zk updater for znode {} started", znode);
313      while (!server.isStopped()) {
314        try {
315          byte[] data = zkData.get();
316          if (data != null) {
317            setDataForClientZkUntilSuccess(znode, data);
318          } else {
319            if (zkData.isDeleted()) {
320              deleteDataForClientZkUntilSuccess(znode);
321              break;
322            }
323          }
324        } catch (InterruptedException e) {
325          LOG.debug("Interrupted while checking whether need to update meta location to client zk");
326          Thread.currentThread().interrupt();
327          break;
328        }
329      }
330      LOG.debug("Client zk updater for znode {} stopped", znode);
331    }
332  }
333}