001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store; 020 021import java.io.IOException; 022 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.yetus.audience.InterfaceStability; 025import org.apache.hadoop.hbase.procedure2.Procedure; 026 027/** 028 * The ProcedureStore is used by the executor to persist the state of each procedure execution. 029 * This allows to resume the execution of pending/in-progress procedures in case 030 * of machine failure or service shutdown. 031 */ 032@InterfaceAudience.Private 033@InterfaceStability.Evolving 034public interface ProcedureStore { 035 /** 036 * Store listener interface. 037 * <p/> 038 * The main process should register a listener and respond to the store events. 039 */ 040 public interface ProcedureStoreListener { 041 042 /** 043 * triggered when the store sync is completed. 044 */ 045 default void postSync() { 046 } 047 048 /** 049 * triggered when the store is not able to write out data. the main process should abort. 050 */ 051 default void abortProcess() { 052 } 053 054 /** 055 * Suggest that the upper layer should update the state of some procedures. Ignore this call 056 * will not effect correctness but performance. 057 * <p/> 058 * For a WAL based ProcedureStore implementation, if all the procedures stored in a WAL file 059 * have been deleted, or updated later in another WAL file, then we can delete the WAL file. If 060 * there are old procedures in a WAL file which are never deleted or updated, then we can not 061 * delete the WAL file and this will cause we hold lots of WAL file and slow down the master 062 * restarts. So here we introduce this method to tell the upper layer that please update the 063 * states of these procedures so that we can delete the old WAL file. 064 * @param procIds the id for the procedures 065 */ 066 default void forceUpdate(long[] procIds) { 067 } 068 } 069 070 /** 071 * An Iterator over a collection of Procedure 072 */ 073 public interface ProcedureIterator { 074 /** 075 * Reset the Iterator by seeking to the beginning of the list. 076 */ 077 void reset(); 078 079 /** 080 * Returns true if the iterator has more elements. 081 * (In other words, returns true if next() would return a Procedure 082 * rather than throwing an exception.) 083 * @return true if the iterator has more procedures 084 */ 085 boolean hasNext(); 086 087 /** 088 * Calling this method does not need to convert the protobuf message to the Procedure class, so 089 * if it returns true we can call {@link #skipNext()} to skip the procedure without 090 * deserializing. This could increase the performance. 091 * @return true if the iterator next element is a completed procedure. 092 */ 093 boolean isNextFinished(); 094 095 /** 096 * Skip the next procedure 097 * <p/> 098 * This method is used to skip the deserializing of the procedure to increase performance, as 099 * when calling next we need to convert the protobuf message to the Procedure class. 100 */ 101 void skipNext(); 102 103 /** 104 * Returns the next procedure in the iteration. 105 * @throws IOException if there was an error fetching/deserializing the procedure 106 * @return the next procedure in the iteration. 107 */ 108 @SuppressWarnings("rawtypes") 109 Procedure next() throws IOException; 110 } 111 112 /** 113 * Interface passed to the ProcedureStore.load() method to handle the store-load events. 114 */ 115 public interface ProcedureLoader { 116 /** 117 * Called by ProcedureStore.load() to notify about the maximum proc-id in the store. 118 * @param maxProcId the highest proc-id in the store 119 */ 120 void setMaxProcId(long maxProcId); 121 122 /** 123 * Called by the ProcedureStore.load() every time a set of procedures are ready to be executed. 124 * The ProcedureIterator passed to the method, has the procedure sorted in replay-order. 125 * @param procIter iterator over the procedures ready to be added to the executor. 126 */ 127 void load(ProcedureIterator procIter) throws IOException; 128 129 /** 130 * Called by the ProcedureStore.load() in case we have procedures not-ready to be added to 131 * the executor, which probably means they are corrupted since some information/link is missing. 132 * @param procIter iterator over the procedures not ready to be added to the executor, corrupted 133 */ 134 void handleCorrupted(ProcedureIterator procIter) throws IOException; 135 } 136 137 /** 138 * Add the listener to the notification list. 139 * @param listener The AssignmentListener to register 140 */ 141 void registerListener(ProcedureStoreListener listener); 142 143 /** 144 * Remove the listener from the notification list. 145 * @param listener The AssignmentListener to unregister 146 * @return true if the listner was in the list and it was removed, otherwise false. 147 */ 148 boolean unregisterListener(ProcedureStoreListener listener); 149 150 /** 151 * Start/Open the procedure store 152 * @param numThreads 153 */ 154 void start(int numThreads) throws IOException; 155 156 /** 157 * Stop/Close the procedure store 158 * @param abort true if the stop is an abort 159 */ 160 void stop(boolean abort); 161 162 /** 163 * @return true if the store is running, otherwise false. 164 */ 165 boolean isRunning(); 166 167 /** 168 * @return the number of threads/slots passed to start() 169 */ 170 int getNumThreads(); 171 172 /** 173 * Set the number of procedure running. 174 * This can be used, for example, by the store to know how long to wait before a sync. 175 * @return how many procedures are running (may not be same as <code>count</code>). 176 */ 177 int setRunningProcedureCount(int count); 178 179 /** 180 * Acquire the lease for the procedure store. 181 */ 182 void recoverLease() throws IOException; 183 184 /** 185 * Load the Procedures in the store. 186 * @param loader the ProcedureLoader that will handle the store-load events 187 */ 188 void load(ProcedureLoader loader) throws IOException; 189 190 /** 191 * When a procedure is submitted to the executor insert(proc, null) will be called. 192 * 'proc' has a 'RUNNABLE' state and the initial information required to start up. 193 * 194 * When a procedure is executed and it returns children insert(proc, subprocs) will be called. 195 * 'proc' has a 'WAITING' state and an update state. 196 * 'subprocs' are the children in 'RUNNABLE' state with the initial information. 197 * 198 * @param proc the procedure to serialize and write to the store. 199 * @param subprocs the newly created child of the proc. 200 */ 201 void insert(Procedure<?> proc, Procedure<?>[] subprocs); 202 203 /** 204 * Serialize a set of new procedures. 205 * These procedures are freshly submitted to the executor and each procedure 206 * has a 'RUNNABLE' state and the initial information required to start up. 207 * 208 * @param procs the procedures to serialize and write to the store. 209 */ 210 void insert(Procedure<?>[] procs); 211 212 /** 213 * The specified procedure was executed, 214 * and the new state should be written to the store. 215 * @param proc the procedure to serialize and write to the store. 216 */ 217 void update(Procedure<?> proc); 218 219 /** 220 * The specified procId was removed from the executor, 221 * due to completion, abort or failure. 222 * The store implementor should remove all the information about the specified procId. 223 * @param procId the ID of the procedure to remove. 224 */ 225 void delete(long procId); 226 227 /** 228 * The parent procedure completed. 229 * Update the state and mark all the child deleted. 230 * @param parentProc the parent procedure to serialize and write to the store. 231 * @param subProcIds the IDs of the sub-procedure to remove. 232 */ 233 void delete(Procedure<?> parentProc, long[] subProcIds); 234 235 /** 236 * The specified procIds were removed from the executor, 237 * due to completion, abort or failure. 238 * The store implementor should remove all the information about the specified procIds. 239 * @param procIds the IDs of the procedures to remove. 240 * @param offset the array offset from where to start to delete 241 * @param count the number of IDs to delete 242 */ 243 void delete(long[] procIds, int offset, int count); 244}