001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.zksyncer;
020
021import java.io.IOException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.util.Threads;
030import org.apache.hadoop.hbase.zookeeper.ZKListener;
031import org.apache.hadoop.hbase.zookeeper.ZKUtil;
032import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.apache.zookeeper.CreateMode;
035import org.apache.zookeeper.KeeperException;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Tracks the target znode(s) on server ZK cluster and synchronize them to client ZK cluster if
041 * changed
042 * <p/>
043 * The target znode(s) is given through {@link #getPathsToWatch()} method
044 */
045@InterfaceAudience.Private
046public abstract class ClientZKSyncer extends ZKListener {
047  private static final Logger LOG = LoggerFactory.getLogger(ClientZKSyncer.class);
048  private final Server server;
049  private final ZKWatcher clientZkWatcher;
050
051  /**
052   * Used to store the newest data which we want to sync to client zk.
053   * <p/>
054   * For meta location, since we may reduce the replica number, so here we add a {@code delete} flag
055   * to tell the updater delete the znode on client zk and quit.
056   */
057  private static final class ZKData {
058
059    byte[] data;
060
061    boolean delete = false;
062
063    synchronized void set(byte[] data) {
064      this.data = data;
065      notifyAll();
066    }
067
068    synchronized byte[] get() throws InterruptedException {
069      while (!delete && data == null) {
070        wait();
071      }
072      byte[] d = data;
073      data = null;
074      return d;
075    }
076
077    synchronized void delete() {
078      this.delete = true;
079      notifyAll();
080    }
081
082    synchronized boolean isDeleted() {
083      return delete;
084    }
085  }
086
087  // We use queues and daemon threads to synchronize the data to client ZK cluster
088  // to avoid blocking the single event thread for watchers
089  private final ConcurrentMap<String, ZKData> queues;
090
091  public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server server) {
092    super(watcher);
093    this.server = server;
094    this.clientZkWatcher = clientZkWatcher;
095    this.queues = new ConcurrentHashMap<>();
096  }
097
098  private void startNewSyncThread(String path) {
099    ZKData zkData = new ZKData();
100    queues.put(path, zkData);
101    Thread updater = new ClientZkUpdater(path, zkData);
102    updater.setDaemon(true);
103    updater.start();
104    watchAndCheckExists(path);
105  }
106
107  /**
108   * Starts the syncer
109   * @throws KeeperException if error occurs when trying to create base nodes on client ZK
110   */
111  public void start() throws KeeperException {
112    LOG.debug("Starting " + getClass().getSimpleName());
113    this.watcher.registerListener(this);
114    // create base znode on remote ZK
115    ZKUtil.createWithParents(clientZkWatcher, watcher.getZNodePaths().baseZNode);
116    // set znodes for client ZK
117    Set<String> paths = getPathsToWatch();
118    LOG.debug("ZNodes to watch: {}", paths);
119    // initialize queues and threads
120    for (String path : paths) {
121      startNewSyncThread(path);
122    }
123  }
124
125  private void watchAndCheckExists(String node) {
126    try {
127      if (ZKUtil.watchAndCheckExists(watcher, node)) {
128        byte[] data = ZKUtil.getDataAndWatch(watcher, node);
129        if (data != null) {
130          // put the data into queue
131          upsertQueue(node, data);
132        } else {
133          // It existed but now does not, should has been tracked by our watcher, ignore
134          LOG.debug("Found no data from " + node);
135          watchAndCheckExists(node);
136        }
137      } else {
138        // cleanup stale ZNodes on client ZK to avoid invalid requests to server
139        ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
140      }
141    } catch (KeeperException e) {
142      server.abort("Unexpected exception during initialization, aborting", e);
143    }
144  }
145
146  /**
147   * Update the value of the single element in queue if any, or else insert.
148   * <p/>
149   * We only need to synchronize the latest znode value to client ZK rather than synchronize each
150   * time
151   * @param data the data to write to queue
152   */
153  private void upsertQueue(String node, byte[] data) {
154    ZKData zkData = queues.get(node);
155    if (zkData != null) {
156      zkData.set(data);
157    }
158  }
159
160  /**
161   * Set data for client ZK and retry until succeed. Be very careful to prevent dead loop when
162   * modifying this method
163   * @param node the znode to set on client ZK
164   * @param data the data to set to client ZK
165   * @throws InterruptedException if the thread is interrupted during process
166   */
167  private void setDataForClientZkUntilSuccess(String node, byte[] data)
168    throws InterruptedException {
169    boolean create = false;
170    while (!server.isStopped()) {
171      try {
172        LOG.debug("Set data for remote " + node + ", client zk wather: " + clientZkWatcher);
173        if (create) {
174          ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, CreateMode.PERSISTENT);
175        } else {
176          ZKUtil.setData(clientZkWatcher, node, data);
177        }
178        break;
179      } catch (KeeperException e) {
180        LOG.debug("Failed to set data for {} to client ZK, will retry later", node, e);
181        if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
182          reconnectAfterExpiration();
183        }
184        if (e.code() == KeeperException.Code.NONODE) {
185          create = true;
186        }
187        if (e.code() == KeeperException.Code.NODEEXISTS) {
188          create = false;
189        }
190      }
191      Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
192    }
193  }
194
195  private void deleteDataForClientZkUntilSuccess(String node) throws InterruptedException {
196    while (!server.isStopped()) {
197      LOG.debug("Delete remote " + node + ", client zk wather: " + clientZkWatcher);
198      try {
199        ZKUtil.deleteNode(clientZkWatcher, node);
200      } catch (KeeperException e) {
201        LOG.debug("Failed to delete node from client ZK, will retry later", e);
202        if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
203          reconnectAfterExpiration();
204        }
205        
206      }
207    }
208  }
209
210  private final void reconnectAfterExpiration() throws InterruptedException {
211    LOG.warn("ZK session expired or lost. Retry a new connection...");
212    try {
213      clientZkWatcher.reconnectAfterExpiration();
214    } catch (IOException | KeeperException e) {
215      LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
216    }
217  }
218
219  private void getDataAndWatch(String path) {
220    try {
221      byte[] data = ZKUtil.getDataAndWatch(watcher, path);
222      upsertQueue(path, data);
223    } catch (KeeperException e) {
224      LOG.warn("Unexpected exception handling nodeCreated event", e);
225    }
226  }
227
228  private void removeQueue(String path) {
229    ZKData zkData = queues.remove(path);
230    if (zkData != null) {
231      zkData.delete();
232    }
233  }
234
235  @Override
236  public void nodeCreated(String path) {
237    if (validate(path)) {
238      getDataAndWatch(path);
239    } else {
240      removeQueue(path);
241    }
242  }
243
244  @Override
245  public void nodeDataChanged(String path) {
246    nodeCreated(path);
247  }
248
249  @Override
250  public synchronized void nodeDeleted(String path) {
251    if (validate(path)) {
252      try {
253        if (ZKUtil.watchAndCheckExists(watcher, path)) {
254          getDataAndWatch(path);
255        }
256      } catch (KeeperException e) {
257        LOG.warn("Unexpected exception handling nodeDeleted event for path: " + path, e);
258      }
259    } else {
260      removeQueue(path);
261    }
262  }
263
264  /**
265   * Validate whether a znode path is watched by us
266   * @param path the path to validate
267   * @return true if the znode is watched by us
268   */
269  protected abstract boolean validate(String path);
270
271  /**
272   * @return the zk path(s) to watch
273   */
274  protected abstract Set<String> getPathsToWatch();
275
276  protected final void refreshWatchingList() {
277    Set<String> newPaths = getPathsToWatch();
278    LOG.debug("New ZNodes to watch: {}", newPaths);
279    Iterator<Map.Entry<String, ZKData>> iter = queues.entrySet().iterator();
280    // stop unused syncers
281    while (iter.hasNext()) {
282      Map.Entry<String, ZKData> entry = iter.next();
283      if (!newPaths.contains(entry.getKey())) {
284        iter.remove();
285        entry.getValue().delete();
286      }
287    }
288    // start new syncers
289    for (String newPath : newPaths) {
290      if (!queues.containsKey(newPath)) {
291        startNewSyncThread(newPath);
292      }
293    }
294  }
295
296  /**
297   * Thread to synchronize znode data to client ZK cluster
298   */
299  private final class ClientZkUpdater extends Thread {
300    private final String znode;
301    private final ZKData zkData;
302
303    public ClientZkUpdater(String znode, ZKData zkData) {
304      this.znode = znode;
305      this.zkData = zkData;
306      setName("ClientZKUpdater-" + znode);
307    }
308
309    @Override
310    public void run() {
311      LOG.debug("Client zk updater for znode {} started", znode);
312      while (!server.isStopped()) {
313        try {
314          byte[] data = zkData.get();
315          if (data != null) {
316            setDataForClientZkUntilSuccess(znode, data);
317          } else {
318            if (zkData.isDeleted()) {
319              deleteDataForClientZkUntilSuccess(znode);
320              break;
321            }
322          }
323        } catch (InterruptedException e) {
324          LOG.debug("Interrupted while checking whether need to update meta location to client zk");
325          Thread.currentThread().interrupt();
326          break;
327        }
328      }
329      LOG.debug("Client zk updater for znode {} stopped", znode);
330    }
331  }
332}