001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store;
019
020import java.io.IOException;
021
022import org.apache.hadoop.hbase.procedure2.Procedure;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.yetus.audience.InterfaceStability;
025
026/**
027 * The ProcedureStore is used by the executor to persist the state of each procedure execution.
028 * This allows to resume the execution of pending/in-progress procedures in case
029 * of machine failure or service shutdown.
030 */
031@InterfaceAudience.Private
032@InterfaceStability.Evolving
033public interface ProcedureStore {
034  /**
035   * Store listener interface.
036   * <p/>
037   * The main process should register a listener and respond to the store events.
038   */
039  public interface ProcedureStoreListener {
040
041    /**
042     * triggered when the store sync is completed.
043     */
044    default void postSync() {
045    }
046
047    /**
048     * triggered when the store is not able to write out data. the main process should abort.
049     */
050    default void abortProcess() {
051    }
052
053    /**
054     * Suggest that the upper layer should update the state of some procedures. Ignore this call
055     * will not effect correctness but performance.
056     * <p/>
057     * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file
058     * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If
059     * there are old procedures in a WAL file which are never deleted or updated, then we can not
060     * delete the WAL file and this will cause we hold lots of WAL file and slow down the master
061     * restarts. So here we introduce this method to tell the upper layer that please update the
062     * states of these procedures so that we can delete the old WAL file.
063     * @param procIds the id for the procedures
064     */
065    default void forceUpdate(long[] procIds) {
066    }
067  }
068
069  /**
070   * An Iterator over a collection of Procedure
071   */
072  public interface ProcedureIterator {
073    /**
074     * Reset the Iterator by seeking to the beginning of the list.
075     */
076    void reset();
077
078    /**
079     * Returns true if the iterator has more elements.
080     * (In other words, returns true if next() would return a Procedure
081     * rather than throwing an exception.)
082     * @return true if the iterator has more procedures
083     */
084    boolean hasNext();
085
086    /**
087     * Calling this method does not need to convert the protobuf message to the Procedure class, so
088     * if it returns true we can call {@link #skipNext()} to skip the procedure without
089     * deserializing. This could increase the performance.
090     * @return true if the iterator next element is a completed procedure.
091     */
092    boolean isNextFinished();
093
094    /**
095     * Skip the next procedure
096     * <p/>
097     * This method is used to skip the deserializing of the procedure to increase performance, as
098     * when calling next we need to convert the protobuf message to the Procedure class.
099     */
100    void skipNext();
101
102    /**
103     * Returns the next procedure in the iteration.
104     * @throws IOException if there was an error fetching/deserializing the procedure
105     * @return the next procedure in the iteration.
106     */
107    @SuppressWarnings("rawtypes")
108    Procedure next() throws IOException;
109  }
110
111  /**
112   * Interface passed to the ProcedureStore.load() method to handle the store-load events.
113   */
114  public interface ProcedureLoader {
115    /**
116     * Called by ProcedureStore.load() to notify about the maximum proc-id in the store.
117     * @param maxProcId the highest proc-id in the store
118     */
119    void setMaxProcId(long maxProcId);
120
121    /**
122     * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed.
123     * The ProcedureIterator passed to the method, has the procedure sorted in replay-order.
124     * @param procIter iterator over the procedures ready to be added to the executor.
125     */
126    void load(ProcedureIterator procIter) throws IOException;
127
128    /**
129     * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to
130     * the executor, which probably means they are corrupted since some information/link is missing.
131     * @param procIter iterator over the procedures not ready to be added to the executor, corrupted
132     */
133    void handleCorrupted(ProcedureIterator procIter) throws IOException;
134  }
135
136  /**
137   * Add the listener to the notification list.
138   * @param listener The AssignmentListener to register
139   */
140  void registerListener(ProcedureStoreListener listener);
141
142  /**
143   * Remove the listener from the notification list.
144   * @param listener The AssignmentListener to unregister
145   * @return true if the listner was in the list and it was removed, otherwise false.
146   */
147  boolean unregisterListener(ProcedureStoreListener listener);
148
149  /**
150   * Start/Open the procedure store
151   * @param numThreads number of threads to be used by the procedure store
152   */
153  void start(int numThreads) throws IOException;
154
155  /**
156   * Stop/Close the procedure store
157   * @param abort true if the stop is an abort
158   */
159  void stop(boolean abort);
160
161  /**
162   * @return true if the store is running, otherwise false.
163   */
164  boolean isRunning();
165
166  /**
167   * @return the number of threads/slots passed to start()
168   */
169  int getNumThreads();
170
171  /**
172   * Set the number of procedure running.
173   * This can be used, for example, by the store to know how long to wait before a sync.
174   * @return how many procedures are running (may not be same as <code>count</code>).
175   */
176  int setRunningProcedureCount(int count);
177
178  /**
179   * Acquire the lease for the procedure store.
180   */
181  void recoverLease() throws IOException;
182
183  /**
184   * Load the Procedures in the store.
185   * @param loader the ProcedureLoader that will handle the store-load events
186   */
187  void load(ProcedureLoader loader) throws IOException;
188
189  /**
190   * When a procedure is submitted to the executor insert(proc, null) will be called.
191   * 'proc' has a 'RUNNABLE' state and the initial information required to start up.
192   *
193   * When a procedure is executed and it returns children insert(proc, subprocs) will be called.
194   * 'proc' has a 'WAITING' state and an update state.
195   * 'subprocs' are the children in 'RUNNABLE' state with the initial information.
196   *
197   * @param proc the procedure to serialize and write to the store.
198   * @param subprocs the newly created child of the proc.
199   */
200  void insert(Procedure<?> proc, Procedure<?>[] subprocs);
201
202  /**
203   * Serialize a set of new procedures.
204   * These procedures are freshly submitted to the executor and each procedure
205   * has a 'RUNNABLE' state and the initial information required to start up.
206   *
207   * @param procs the procedures to serialize and write to the store.
208   */
209  void insert(Procedure<?>[] procs);
210
211  /**
212   * The specified procedure was executed,
213   * and the new state should be written to the store.
214   * @param proc the procedure to serialize and write to the store.
215   */
216  void update(Procedure<?> proc);
217
218  /**
219   * The specified procId was removed from the executor,
220   * due to completion, abort or failure.
221   * The store implementor should remove all the information about the specified procId.
222   * @param procId the ID of the procedure to remove.
223   */
224  void delete(long procId);
225
226  /**
227   * The parent procedure completed.
228   * Update the state and mark all the child deleted.
229   * @param parentProc the parent procedure to serialize and write to the store.
230   * @param subProcIds the IDs of the sub-procedure to remove.
231   */
232  void delete(Procedure<?> parentProc, long[] subProcIds);
233
234  /**
235   * The specified procIds were removed from the executor,
236   * due to completion, abort or failure.
237   * The store implementor should remove all the information about the specified procIds.
238   * @param procIds the IDs of the procedures to remove.
239   * @param offset the array offset from where to start to delete
240   * @param count the number of IDs to delete
241   */
242  void delete(long[] procIds, int offset, int count);
243}