001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store;
020
021import java.io.IOException;
022
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.yetus.audience.InterfaceStability;
025import org.apache.hadoop.hbase.procedure2.Procedure;
026
027/**
028 * The ProcedureStore is used by the executor to persist the state of each procedure execution.
029 * This allows to resume the execution of pending/in-progress procedures in case
030 * of machine failure or service shutdown.
031 */
032@InterfaceAudience.Private
033@InterfaceStability.Evolving
034public interface ProcedureStore {
035  /**
036   * Store listener interface.
037   * <p/>
038   * The main process should register a listener and respond to the store events.
039   */
040  public interface ProcedureStoreListener {
041
042    /**
043     * triggered when the store sync is completed.
044     */
045    default void postSync() {
046    }
047
048    /**
049     * triggered when the store is not able to write out data. the main process should abort.
050     */
051    default void abortProcess() {
052    }
053
054    /**
055     * Suggest that the upper layer should update the state of some procedures. Ignore this call
056     * will not effect correctness but performance.
057     * <p/>
058     * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file
059     * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If
060     * there are old procedures in a WAL file which are never deleted or updated, then we can not
061     * delete the WAL file and this will cause we hold lots of WAL file and slow down the master
062     * restarts. So here we introduce this method to tell the upper layer that please update the
063     * states of these procedures so that we can delete the old WAL file.
064     * @param procIds the id for the procedures
065     */
066    default void forceUpdate(long[] procIds) {
067    }
068  }
069
070  /**
071   * An Iterator over a collection of Procedure
072   */
073  public interface ProcedureIterator {
074    /**
075     * Reset the Iterator by seeking to the beginning of the list.
076     */
077    void reset();
078
079    /**
080     * Returns true if the iterator has more elements.
081     * (In other words, returns true if next() would return a Procedure
082     * rather than throwing an exception.)
083     * @return true if the iterator has more procedures
084     */
085    boolean hasNext();
086
087    /**
088     * Calling this method does not need to convert the protobuf message to the Procedure class, so
089     * if it returns true we can call {@link #skipNext()} to skip the procedure without
090     * deserializing. This could increase the performance.
091     * @return true if the iterator next element is a completed procedure.
092     */
093    boolean isNextFinished();
094
095    /**
096     * Skip the next procedure
097     * <p/>
098     * This method is used to skip the deserializing of the procedure to increase performance, as
099     * when calling next we need to convert the protobuf message to the Procedure class.
100     */
101    void skipNext();
102
103    /**
104     * Returns the next procedure in the iteration.
105     * @throws IOException if there was an error fetching/deserializing the procedure
106     * @return the next procedure in the iteration.
107     */
108    @SuppressWarnings("rawtypes")
109    Procedure next() throws IOException;
110  }
111
112  /**
113   * Interface passed to the ProcedureStore.load() method to handle the store-load events.
114   */
115  public interface ProcedureLoader {
116    /**
117     * Called by ProcedureStore.load() to notify about the maximum proc-id in the store.
118     * @param maxProcId the highest proc-id in the store
119     */
120    void setMaxProcId(long maxProcId);
121
122    /**
123     * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed.
124     * The ProcedureIterator passed to the method, has the procedure sorted in replay-order.
125     * @param procIter iterator over the procedures ready to be added to the executor.
126     */
127    void load(ProcedureIterator procIter) throws IOException;
128
129    /**
130     * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to
131     * the executor, which probably means they are corrupted since some information/link is missing.
132     * @param procIter iterator over the procedures not ready to be added to the executor, corrupted
133     */
134    void handleCorrupted(ProcedureIterator procIter) throws IOException;
135  }
136
137  /**
138   * Add the listener to the notification list.
139   * @param listener The AssignmentListener to register
140   */
141  void registerListener(ProcedureStoreListener listener);
142
143  /**
144   * Remove the listener from the notification list.
145   * @param listener The AssignmentListener to unregister
146   * @return true if the listner was in the list and it was removed, otherwise false.
147   */
148  boolean unregisterListener(ProcedureStoreListener listener);
149
150  /**
151   * Start/Open the procedure store
152   * @param numThreads
153   */
154  void start(int numThreads) throws IOException;
155
156  /**
157   * Stop/Close the procedure store
158   * @param abort true if the stop is an abort
159   */
160  void stop(boolean abort);
161
162  /**
163   * @return true if the store is running, otherwise false.
164   */
165  boolean isRunning();
166
167  /**
168   * @return the number of threads/slots passed to start()
169   */
170  int getNumThreads();
171
172  /**
173   * Set the number of procedure running.
174   * This can be used, for example, by the store to know how long to wait before a sync.
175   * @return how many procedures are running (may not be same as <code>count</code>).
176   */
177  int setRunningProcedureCount(int count);
178
179  /**
180   * Acquire the lease for the procedure store.
181   */
182  void recoverLease() throws IOException;
183
184  /**
185   * Load the Procedures in the store.
186   * @param loader the ProcedureLoader that will handle the store-load events
187   */
188  void load(ProcedureLoader loader) throws IOException;
189
190  /**
191   * When a procedure is submitted to the executor insert(proc, null) will be called.
192   * 'proc' has a 'RUNNABLE' state and the initial information required to start up.
193   *
194   * When a procedure is executed and it returns children insert(proc, subprocs) will be called.
195   * 'proc' has a 'WAITING' state and an update state.
196   * 'subprocs' are the children in 'RUNNABLE' state with the initial information.
197   *
198   * @param proc the procedure to serialize and write to the store.
199   * @param subprocs the newly created child of the proc.
200   */
201  void insert(Procedure<?> proc, Procedure<?>[] subprocs);
202
203  /**
204   * Serialize a set of new procedures.
205   * These procedures are freshly submitted to the executor and each procedure
206   * has a 'RUNNABLE' state and the initial information required to start up.
207   *
208   * @param procs the procedures to serialize and write to the store.
209   */
210  void insert(Procedure<?>[] procs);
211
212  /**
213   * The specified procedure was executed,
214   * and the new state should be written to the store.
215   * @param proc the procedure to serialize and write to the store.
216   */
217  void update(Procedure<?> proc);
218
219  /**
220   * The specified procId was removed from the executor,
221   * due to completion, abort or failure.
222   * The store implementor should remove all the information about the specified procId.
223   * @param procId the ID of the procedure to remove.
224   */
225  void delete(long procId);
226
227  /**
228   * The parent procedure completed.
229   * Update the state and mark all the child deleted.
230   * @param parentProc the parent procedure to serialize and write to the store.
231   * @param subProcIds the IDs of the sub-procedure to remove.
232   */
233  void delete(Procedure<?> parentProc, long[] subProcIds);
234
235  /**
236   * The specified procIds were removed from the executor,
237   * due to completion, abort or failure.
238   * The store implementor should remove all the information about the specified procIds.
239   * @param procIds the IDs of the procedures to remove.
240   * @param offset the array offset from where to start to delete
241   * @param count the number of IDs to delete
242   */
243  void delete(long[] procIds, int offset, int count);
244}