View Javadoc

1   /**
2    * Copyright 2011 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import java.io.IOException;
23  import java.io.UnsupportedEncodingException;
24  import java.lang.reflect.Field;
25  import java.net.URLDecoder;
26  import java.net.URLEncoder;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.List;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HBaseFileSystem;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.master.SplitLogManager;
39  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
40  import org.apache.hadoop.hbase.util.Bytes;
41  
42  /**
43   * Common methods and attributes used by {@link SplitLogManager} and
44   * {@link SplitLogWorker}
45   */
46  public class ZKSplitLog {
47    private static final Log LOG = LogFactory.getLog(ZKSplitLog.class);
48  
49    public static final int DEFAULT_TIMEOUT = 300000; // 5 mins
50    public static final int DEFAULT_ZK_RETRIES = 3;
51    public static final int DEFAULT_MAX_RESUBMIT = 3;
52    public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
53  
54    /**
55     * Gets the full path node name for the log file being split.
56     * This method will url encode the filename.
57     * @param zkw zk reference
58     * @param filename log file name (only the basename)
59     */
60    public static String getEncodedNodeName(ZooKeeperWatcher zkw,
61        String filename) {
62        return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename));
63    }
64  
65    public static String getFileName(String node) {
66      String basename = node.substring(node.lastIndexOf('/') + 1);
67      return decode(basename);
68    }
69  
70  
71    public static String encode(String s) {
72      try {
73        return URLEncoder.encode(s, "UTF-8");
74      } catch (UnsupportedEncodingException e) {
75        throw new RuntimeException("URLENCODER doesn't support UTF-8");
76      }
77    }
78  
79    public static String decode(String s) {
80      try {
81        return URLDecoder.decode(s, "UTF-8");
82      } catch (UnsupportedEncodingException e) {
83        throw new RuntimeException("URLDecoder doesn't support UTF-8");
84      }
85    }
86  
87    public static String getRescanNode(ZooKeeperWatcher zkw) {
88      return ZKUtil.joinZNode(zkw.splitLogZNode, "RESCAN");
89    }
90  
91    public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) {
92      String prefix = getRescanNode(zkw);
93      if (path.length() <= prefix.length()) {
94        return false;
95      }
96      for (int i = 0; i < prefix.length(); i++) {
97        if (prefix.charAt(i) != path.charAt(i)) {
98          return false;
99        }
100     }
101     return true;
102   }
103 
104   public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) {
105     String dirname = path.substring(0, path.lastIndexOf('/'));
106     return dirname.equals(zkw.splitLogZNode);
107   }
108 
109   public static enum TaskState {
110     TASK_UNASSIGNED("unassigned"),
111     TASK_OWNED("owned"),
112     TASK_RESIGNED("resigned"),
113     TASK_DONE("done"),
114     TASK_ERR("err");
115 
116     private final byte[] state;
117     private TaskState(String s) {
118       state = s.getBytes();
119     }
120 
121     public byte[] get(String serverName) {
122       return (Bytes.add(state, " ".getBytes(), serverName.getBytes()));
123     }
124 
125     public String getWriterName(byte[] data) {
126       String str = Bytes.toString(data);
127       return str.substring(str.indexOf(' ') + 1);
128     }
129 
130 
131     /**
132      * @param s
133      * @return True if {@link #state} is a prefix of s. False otherwise.
134      */
135     public boolean equals(byte[] s) {
136       if (s.length < state.length) {
137         return (false);
138       }
139       for (int i = 0; i < state.length; i++) {
140         if (state[i] != s[i]) {
141           return (false);
142         }
143       }
144       return (true);
145     }
146 
147     public boolean equals(byte[] s, String serverName) {
148       return (Arrays.equals(s, get(serverName)));
149     }
150     @Override
151     public String toString() {
152       return new String(state);
153     }
154   }
155 
156   public static Path getSplitLogDir(Path rootdir, String tmpname) {
157     return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
158   }
159 
160   public static String getSplitLogDirTmpComponent(String worker, String file) {
161     return (worker + "_" + ZKSplitLog.encode(file));
162   }
163 
164   public static void markCorrupted(Path rootdir, String logFileName,
165       FileSystem fs) {
166     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
167     try {
168       HBaseFileSystem.createNewFileOnFileSystem(fs, file);
169     } catch (IOException e) {
170       LOG.warn("Could not flag a log file as corrupted. Failed to create " +
171           file, e);
172     }
173   }
174 
175   public static boolean isCorrupted(Path rootdir, String logFileName,
176       FileSystem fs) throws IOException {
177     Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
178     boolean isCorrupt;
179     isCorrupt = fs.exists(file);
180     return isCorrupt;
181   }
182 
183 
184   public static class Counters {
185     //SplitLogManager counters
186     public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0);
187     public static AtomicLong tot_mgr_log_split_batch_success =
188       new AtomicLong(0);
189     public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0);
190     public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0);
191     public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0);
192     public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0);
193     public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0);
194     public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0);
195     public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0);
196     public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0);
197     public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0);
198     public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0);
199     public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0);
200     public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0);
201     public static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0);
202     public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0);
203     public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0);
204     public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0);
205     public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0);
206     public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0);
207     public static AtomicLong tot_mgr_resubmit = new AtomicLong(0);
208     public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0);
209     public static AtomicLong tot_mgr_null_data = new AtomicLong(0);
210     public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0);
211     public static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0);
212     public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0);
213     public static AtomicLong tot_mgr_resubmit_threshold_reached =
214       new AtomicLong(0);
215     public static AtomicLong tot_mgr_missing_state_in_delete =
216       new AtomicLong(0);
217     public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0);
218     public static AtomicLong tot_mgr_rescan = new AtomicLong(0);
219     public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0);
220     public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0);
221     public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0);
222     public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0);
223     public static AtomicLong tot_mgr_resubmit_dead_server_task =
224       new AtomicLong(0);
225 
226 
227 
228     // SplitLogWorker counters
229     public static AtomicLong tot_wkr_failed_to_grab_task_no_data =
230       new AtomicLong(0);
231     public static AtomicLong tot_wkr_failed_to_grab_task_exception =
232       new AtomicLong(0);
233     public static AtomicLong tot_wkr_failed_to_grab_task_owned =
234       new AtomicLong(0);
235     public static AtomicLong tot_wkr_failed_to_grab_task_lost_race =
236       new AtomicLong(0);
237     public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0);
238     public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0);
239     public static AtomicLong tot_wkr_task_done = new AtomicLong(0);
240     public static AtomicLong tot_wkr_task_err = new AtomicLong(0);
241     public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0);
242     public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0);
243     public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0);
244     public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0);
245     public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0);
246     public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0);
247     public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0);
248     public static AtomicLong tot_wkr_final_transistion_failed =
249       new AtomicLong(0);
250 
251     public static void resetCounters() throws Exception {
252       Class<?> cl = (new Counters()).getClass();
253       Field[] flds = cl.getDeclaredFields();
254       for (Field fld : flds) {
255         ((AtomicLong)fld.get(null)).set(0);
256       }
257     }
258   }
259 }