package com.nazdaq.workflow.engine.core.manager;

import akka.actor.ActorSystem;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.nazdaq.core.helpers.TextHelper;
import com.nazdaq.workflow.engine.core.compiler.PropertiesCompiler;
import com.nazdaq.workflow.engine.core.compiler.WorkFlowCompiler;
import com.nazdaq.workflow.engine.core.events.IterationFinishStateChangeEvent;
import com.nazdaq.workflow.engine.core.exceptions.IterationRunningException;
import com.nazdaq.workflow.engine.core.exceptions.IterationStartFailException;
import com.nazdaq.workflow.engine.core.exceptions.WorkflowStoppingException;
import com.nazdaq.workflow.engine.core.manager.WorkFlowBase;
import com.nazdaq.workflow.engine.core.models.connections.AbstractNodeConnectionData;
import com.nazdaq.workflow.engine.core.models.connections.NodesConnection;
import com.nazdaq.workflow.engine.core.models.node.AbstractNodeConfigurationData;
import com.nazdaq.workflow.engine.core.models.node.NodeConfiguration;
import com.nazdaq.workflow.engine.core.models.node.NodeValidationCauseType;
import com.nazdaq.workflow.engine.core.processor.NodeResults;
import com.nazdaq.workflow.engine.core.storage.ExecutionStorage;
import com.nazdaq.workflow.engine.core.storage.models.messages.NodeProcessorMessage;
import com.nazdaq.workflow.engine.core.storage.models.state.StateValue;
import com.nazdaq.workflow.engine.core.storage.stores.MessagesStorage;
import com.nazdaq.workflow.engine.core.storage.stores.StatsStorage;
import com.nazdaq.workflow.engine.dag.graph.DefaultDag;
import com.nazdaq.workflow.engine.helpers.ExecutionGraphPrint;
import com.nazdaq.workflow.engine.helpers.NodeGraphData;
import com.nazdaq.workflow.graphql.models.WorkFlowInputSet;
import com.nazdaq.workflow.graphql.models.deplyment.DeployStateFrom;
import com.nazdaq.workflow.graphql.models.execution.iteration.IterationStartInput;
import com.nazdaq.workflow.graphql.models.execution.trigger.TriggerStartStopInput;
import com.nazdaq.workflow.graphql.models.properties.PropertyInput;
import com.nazdaq.workflow.graphql.models.workflowinput.NodesConnectionsPropertiesInput;
import com.nazdaq.workflow.graphql.resolvers.subscriptions.execution.publishers.ExecutionChangePublisher;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import lombok.NonNull;
import models.users.User;
import models.workflow.builder.WorkFlow;
import models.workflow.builder.WorkFlowEnvironment;
import models.workflow.builder.WorkFlowParent;
import models.workflow.builder.configs.WorkFlowConfigs;
import models.workflow.builder.runtime.WorkFlowDeployment;
import models.workflow.executions.WorkFlowExecution;
import models.workflow.executions.WorkFlowExecutionStatus;
import models.workflow.executions.iterations.WorkFlowExecutionIteration;
import models.workflow.executions.iterations.WorkFlowExecutionIterationStatus;
import models.workflow.executions.iterations.nodes.ExecutionNodeStatus;
import models.workflow.executions.iterations.nodes.WorkFlowExecutionNode;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/WorkFlowExecutionManager.class */
public final class WorkFlowExecutionManager extends Thread implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(WorkFlowExecutionManager.class);

    @JsonIgnore
    private final WorkFlowFactory workFlowFactory;

    @JsonIgnore
    private final ActorSystem actorSystem;

    @JsonIgnore
    private final WorkFlowBase base;

    @JsonIgnore
    private final User startedBy;

    @JsonIgnore
    private final TriggersThreadPool triggersPool;

    @JsonIgnore
    private final IterationsThreadPool iterationsPool;
    private final DefaultDag<String, NodeResults> dagGraph;
    private final ExecutionGraphPrint dagGraphPrint;
    private final boolean loaded;
    private final int recoverRetries;

    @JsonIgnore
    private StartInputs startInputs;

    @JsonIgnore
    private StopInputs stopInputs;
    private WorkFlowConfigs workFlowConfigs;

    @JsonIgnore
    private final WorkFlowCompiler compiler;

    @JsonIgnore
    private final PropertiesCompiler propertiesCompiler;
    private int cleanUpAfterLimit;

    @JsonIgnore
    private final ReentrantLock updateWorkFlowLock = new ReentrantLock();

    @JsonIgnore
    private final ConcurrentHashMap<String, NodeConfiguration<? extends AbstractNodeConfigurationData>> nodes = new ConcurrentHashMap<>();

    @JsonIgnore
    private final ConcurrentHashMap<String, NodesConnection<? extends AbstractNodeConnectionData>> connections = new ConcurrentHashMap<>();

    @JsonIgnore
    private final ConcurrentHashMap<String, PropertyInput> globalStaticProperties = new ConcurrentHashMap<>();

    @JsonIgnore
    private final ConcurrentHashMap<String, PropertyInput> globalScriptProperties = new ConcurrentHashMap<>();

    @JsonIgnore
    private final Multimap<String, NodesConnection<? extends AbstractNodeConnectionData>> nodeDestinations = Multimaps.newSetMultimap(new ConcurrentHashMap(), ConcurrentHashMap::newKeySet);
    private volatile boolean stopping = false;
    private volatile boolean stopped = false;
    private volatile boolean started = false;
    private volatile boolean startedFully = false;
    private Duration cleanUpOlderThanDuration = null;
    private volatile boolean cleaningUp = false;

    /* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/WorkFlowExecutionManager$StartInputs.class */
    public static class StartInputs {

        @NonNull
        private final WorkFlowBase base;
        private final int revision;
        private final boolean autoCreateExecution;
        private final NodesConnectionsPropertiesInput nodesAndConnections;

        @NonNull
        private final User startedByUser;

        @NonNull
        private final String startMessage;
        private final DeployStateFrom preserveStateFrom;
        private final List<StateValue> stateValues;
        private final boolean autoStart;
        private final boolean startQueued;
        private final boolean autoDisableStartedTriggers;
        private final boolean startAllTriggers;
        private final Set<String> startedTriggers;
        private final int recoverRetries;

        /* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/WorkFlowExecutionManager$StartInputs$StartInputsBuilder.class */
        public static abstract class StartInputsBuilder<C extends StartInputs, B extends StartInputsBuilder<C, B>> {
            private WorkFlowBase base;
            private int revision;
            private boolean autoCreateExecution;
            private NodesConnectionsPropertiesInput nodesAndConnections;
            private User startedByUser;
            private String startMessage;
            private DeployStateFrom preserveStateFrom;
            private List<StateValue> stateValues;
            private boolean autoStart;
            private boolean startQueued;
            private boolean autoDisableStartedTriggers$set;
            private boolean autoDisableStartedTriggers$value;
            private boolean startAllTriggers;
            private Set<String> startedTriggers;
            private int recoverRetries;

            protected B $fillValuesFrom(C c) {
                $fillValuesFromInstanceIntoBuilder(c, this);
                return self();
            }

            private static void $fillValuesFromInstanceIntoBuilder(StartInputs startInputs, StartInputsBuilder<?, ?> startInputsBuilder) {
                startInputsBuilder.base(startInputs.base);
                startInputsBuilder.revision(startInputs.revision);
                startInputsBuilder.autoCreateExecution(startInputs.autoCreateExecution);
                startInputsBuilder.nodesAndConnections(startInputs.nodesAndConnections);
                startInputsBuilder.startedByUser(startInputs.startedByUser);
                startInputsBuilder.startMessage(startInputs.startMessage);
                startInputsBuilder.preserveStateFrom(startInputs.preserveStateFrom);
                startInputsBuilder.stateValues(startInputs.stateValues);
                startInputsBuilder.autoStart(startInputs.autoStart);
                startInputsBuilder.startQueued(startInputs.startQueued);
                startInputsBuilder.autoDisableStartedTriggers(startInputs.autoDisableStartedTriggers);
                startInputsBuilder.startAllTriggers(startInputs.startAllTriggers);
                startInputsBuilder.startedTriggers(startInputs.startedTriggers);
                startInputsBuilder.recoverRetries(startInputs.recoverRetries);
            }

            public B base(@NonNull WorkFlowBase workFlowBase) {
                if (workFlowBase == null) {
                    throw new NullPointerException("base is marked non-null but is null");
                }
                this.base = workFlowBase;
                return self();
            }

            public B revision(int i) {
                this.revision = i;
                return self();
            }

            public B autoCreateExecution(boolean z) {
                this.autoCreateExecution = z;
                return self();
            }

            public B nodesAndConnections(NodesConnectionsPropertiesInput nodesConnectionsPropertiesInput) {
                this.nodesAndConnections = nodesConnectionsPropertiesInput;
                return self();
            }

            public B startedByUser(@NonNull User user) {
                if (user == null) {
                    throw new NullPointerException("startedByUser is marked non-null but is null");
                }
                this.startedByUser = user;
                return self();
            }

            public B startMessage(@NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("startMessage is marked non-null but is null");
                }
                this.startMessage = str;
                return self();
            }

            public B preserveStateFrom(DeployStateFrom deployStateFrom) {
                this.preserveStateFrom = deployStateFrom;
                return self();
            }

            public B stateValues(List<StateValue> list) {
                this.stateValues = list;
                return self();
            }

            public B autoStart(boolean z) {
                this.autoStart = z;
                return self();
            }

            public B startQueued(boolean z) {
                this.startQueued = z;
                return self();
            }

            public B autoDisableStartedTriggers(boolean z) {
                this.autoDisableStartedTriggers$value = z;
                this.autoDisableStartedTriggers$set = true;
                return self();
            }

            public B startAllTriggers(boolean z) {
                this.startAllTriggers = z;
                return self();
            }

            public B startedTriggers(Set<String> set) {
                this.startedTriggers = set;
                return self();
            }

            public B recoverRetries(int i) {
                this.recoverRetries = i;
                return self();
            }

            protected abstract B self();

            public abstract C build();

            public String toString() {
                return "WorkFlowExecutionManager.StartInputs.StartInputsBuilder(base=" + this.base + ", revision=" + this.revision + ", autoCreateExecution=" + this.autoCreateExecution + ", nodesAndConnections=" + this.nodesAndConnections + ", startedByUser=" + this.startedByUser + ", startMessage=" + this.startMessage + ", preserveStateFrom=" + this.preserveStateFrom + ", stateValues=" + this.stateValues + ", autoStart=" + this.autoStart + ", startQueued=" + this.startQueued + ", autoDisableStartedTriggers$value=" + this.autoDisableStartedTriggers$value + ", startAllTriggers=" + this.startAllTriggers + ", startedTriggers=" + this.startedTriggers + ", recoverRetries=" + this.recoverRetries + ")";
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/WorkFlowExecutionManager$StartInputs$StartInputsBuilderImpl.class */
        public static final class StartInputsBuilderImpl extends StartInputsBuilder<StartInputs, StartInputsBuilderImpl> {
            private StartInputsBuilderImpl() {
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.nazdaq.workflow.engine.core.manager.WorkFlowExecutionManager.StartInputs.StartInputsBuilder
            public StartInputsBuilderImpl self() {
                return this;
            }

            @Override // com.nazdaq.workflow.engine.core.manager.WorkFlowExecutionManager.StartInputs.StartInputsBuilder
            public StartInputs build() {
                return new StartInputs(this);
            }
        }

        private static boolean $default$autoDisableStartedTriggers() {
            return false;
        }

        protected StartInputs(StartInputsBuilder<?, ?> startInputsBuilder) {
            this.base = ((StartInputsBuilder) startInputsBuilder).base;
            if (this.base == null) {
                throw new NullPointerException("base is marked non-null but is null");
            }
            this.revision = ((StartInputsBuilder) startInputsBuilder).revision;
            this.autoCreateExecution = ((StartInputsBuilder) startInputsBuilder).autoCreateExecution;
            this.nodesAndConnections = ((StartInputsBuilder) startInputsBuilder).nodesAndConnections;
            this.startedByUser = ((StartInputsBuilder) startInputsBuilder).startedByUser;
            if (this.startedByUser == null) {
                throw new NullPointerException("startedByUser is marked non-null but is null");
            }
            this.startMessage = ((StartInputsBuilder) startInputsBuilder).startMessage;
            if (this.startMessage == null) {
                throw new NullPointerException("startMessage is marked non-null but is null");
            }
            this.preserveStateFrom = ((StartInputsBuilder) startInputsBuilder).preserveStateFrom;
            this.stateValues = ((StartInputsBuilder) startInputsBuilder).stateValues;
            this.autoStart = ((StartInputsBuilder) startInputsBuilder).autoStart;
            this.startQueued = ((StartInputsBuilder) startInputsBuilder).startQueued;
            if (((StartInputsBuilder) startInputsBuilder).autoDisableStartedTriggers$set) {
                this.autoDisableStartedTriggers = ((StartInputsBuilder) startInputsBuilder).autoDisableStartedTriggers$value;
            } else {
                this.autoDisableStartedTriggers = $default$autoDisableStartedTriggers();
            }
            this.startAllTriggers = ((StartInputsBuilder) startInputsBuilder).startAllTriggers;
            this.startedTriggers = ((StartInputsBuilder) startInputsBuilder).startedTriggers;
            this.recoverRetries = ((StartInputsBuilder) startInputsBuilder).recoverRetries;
        }

        public static StartInputsBuilder<?, ?> builder() {
            return new StartInputsBuilderImpl();
        }

        public StartInputsBuilder<?, ?> toBuilder() {
            return new StartInputsBuilderImpl().$fillValuesFrom(this);
        }

        @NonNull
        public WorkFlowBase getBase() {
            return this.base;
        }

        public int getRevision() {
            return this.revision;
        }

        public boolean isAutoCreateExecution() {
            return this.autoCreateExecution;
        }

        public NodesConnectionsPropertiesInput getNodesAndConnections() {
            return this.nodesAndConnections;
        }

        @NonNull
        public User getStartedByUser() {
            return this.startedByUser;
        }

        @NonNull
        public String getStartMessage() {
            return this.startMessage;
        }

        public DeployStateFrom getPreserveStateFrom() {
            return this.preserveStateFrom;
        }

        public List<StateValue> getStateValues() {
            return this.stateValues;
        }

        public boolean isAutoStart() {
            return this.autoStart;
        }

        public boolean isStartQueued() {
            return this.startQueued;
        }

        public boolean isAutoDisableStartedTriggers() {
            return this.autoDisableStartedTriggers;
        }

        public boolean isStartAllTriggers() {
            return this.startAllTriggers;
        }

        public Set<String> getStartedTriggers() {
            return this.startedTriggers;
        }

        public int getRecoverRetries() {
            return this.recoverRetries;
        }
    }

    /* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/WorkFlowExecutionManager$StopInputs.class */
    public static class StopInputs {

        @NonNull
        private final WorkFlowBase.Key key;
        private final User byUser;

        @NonNull
        private final String stopMessage;
        private final boolean recoverable;
        private final boolean force;
        private final boolean markPaused;
        private final boolean closeStorage;

        /* loaded from: input_file:com/nazdaq/workflow/engine/core/manager/WorkFlowExecutionManager$StopInputs$StopInputsBuilder.class */
        public static class StopInputsBuilder {
            private WorkFlowBase.Key key;
            private User byUser;
            private String stopMessage;
            private boolean recoverable$set;
            private boolean recoverable$value;
            private boolean force$set;
            private boolean force$value;
            private boolean markPaused$set;
            private boolean markPaused$value;
            private boolean closeStorage$set;
            private boolean closeStorage$value;

            StopInputsBuilder() {
            }

            public StopInputsBuilder key(@NonNull WorkFlowBase.Key key) {
                if (key == null) {
                    throw new NullPointerException("key is marked non-null but is null");
                }
                this.key = key;
                return this;
            }

            public StopInputsBuilder byUser(User user) {
                this.byUser = user;
                return this;
            }

            public StopInputsBuilder stopMessage(@NonNull String str) {
                if (str == null) {
                    throw new NullPointerException("stopMessage is marked non-null but is null");
                }
                this.stopMessage = str;
                return this;
            }

            public StopInputsBuilder recoverable(boolean z) {
                this.recoverable$value = z;
                this.recoverable$set = true;
                return this;
            }

            public StopInputsBuilder force(boolean z) {
                this.force$value = z;
                this.force$set = true;
                return this;
            }

            public StopInputsBuilder markPaused(boolean z) {
                this.markPaused$value = z;
                this.markPaused$set = true;
                return this;
            }

            public StopInputsBuilder closeStorage(boolean z) {
                this.closeStorage$value = z;
                this.closeStorage$set = true;
                return this;
            }

            public StopInputs build() {
                boolean z = this.recoverable$value;
                if (!this.recoverable$set) {
                    z = StopInputs.$default$recoverable();
                }
                boolean z2 = this.force$value;
                if (!this.force$set) {
                    z2 = StopInputs.$default$force();
                }
                boolean z3 = this.markPaused$value;
                if (!this.markPaused$set) {
                    z3 = StopInputs.$default$markPaused();
                }
                boolean z4 = this.closeStorage$value;
                if (!this.closeStorage$set) {
                    z4 = StopInputs.$default$closeStorage();
                }
                return new StopInputs(this.key, this.byUser, this.stopMessage, z, z2, z3, z4);
            }

            public String toString() {
                return "WorkFlowExecutionManager.StopInputs.StopInputsBuilder(key=" + this.key + ", byUser=" + this.byUser + ", stopMessage=" + this.stopMessage + ", recoverable$value=" + this.recoverable$value + ", force$value=" + this.force$value + ", markPaused$value=" + this.markPaused$value + ", closeStorage$value=" + this.closeStorage$value + ")";
            }
        }

        private static boolean $default$recoverable() {
            return false;
        }

        private static boolean $default$force() {
            return false;
        }

        private static boolean $default$markPaused() {
            return false;
        }

        private static boolean $default$closeStorage() {
            return false;
        }

        StopInputs(@NonNull WorkFlowBase.Key key, User user, @NonNull String str, boolean z, boolean z2, boolean z3, boolean z4) {
            if (key == null) {
                throw new NullPointerException("key is marked non-null but is null");
            }
            if (str == null) {
                throw new NullPointerException("stopMessage is marked non-null but is null");
            }
            this.key = key;
            this.byUser = user;
            this.stopMessage = str;
            this.recoverable = z;
            this.force = z2;
            this.markPaused = z3;
            this.closeStorage = z4;
        }

        public static StopInputsBuilder builder() {
            return new StopInputsBuilder();
        }

        public String toString() {
            return "WorkFlowExecutionManager.StopInputs(stopMessage=" + getStopMessage() + ", recoverable=" + isRecoverable() + ", force=" + isForce() + ", markPaused=" + isMarkPaused() + ", closeStorage=" + isCloseStorage() + ")";
        }

        @NonNull
        public WorkFlowBase.Key getKey() {
            return this.key;
        }

        public User getByUser() {
            return this.byUser;
        }

        @NonNull
        public String getStopMessage() {
            return this.stopMessage;
        }

        public boolean isRecoverable() {
            return this.recoverable;
        }

        public boolean isForce() {
            return this.force;
        }

        public boolean isMarkPaused() {
            return this.markPaused;
        }

        public boolean isCloseStorage() {
            return this.closeStorage;
        }
    }

    public WorkFlowExecutionManager(@NotNull WorkFlowFactory workFlowFactory, ActorSystem actorSystem, @NotNull WorkFlowBase workFlowBase, @NotNull NodesConnectionsPropertiesInput nodesConnectionsPropertiesInput, @NotNull User user, int i) throws ExecutionException, ClassNotFoundException {
        Objects.requireNonNull(workFlowBase.getExecution(), "Execution can't be null in manager!");
        this.workFlowFactory = workFlowFactory;
        this.actorSystem = actorSystem;
        this.base = workFlowBase;
        this.startedBy = user;
        this.stopInputs = StopInputs.builder().key(key()).byUser(null).stopMessage("Manager stopped.").build();
        this.recoverRetries = i;
        this.workFlowConfigs = workFlowBase.getWorkFlow().getConfigs();
        storage().isOpen();
        this.base.getExecution().setStartedAt(Instant.now());
        this.dagGraph = new DefaultDag<>();
        this.dagGraphPrint = new ExecutionGraphPrint(this);
        this.compiler = new WorkFlowCompiler(this);
        this.propertiesCompiler = new PropertiesCompiler(this);
        this.triggersPool = new TriggersThreadPool(this);
        this.iterationsPool = new IterationsThreadPool(workFlowFactory, this);
        updateNodesConnectionsProperties(nodesConnectionsPropertiesInput);
        initCleanUpJob();
        setName("b2data-" + this.base.getExecution().getId());
        this.loaded = true;
        logger().debug("Manager for {} loaded successfully.", this.base);
    }

    public void refreshBase() {
        this.updateWorkFlowLock.lock();
        try {
            this.base.getParent().refresh();
            this.base.getWorkFlow().refresh();
            this.base.getExecution().setManagerConfigs(this.workFlowConfigs);
            this.workFlowConfigs = this.base.getWorkFlow().getConfigs();
            getExecution().resetLogger();
            logger().debug("Updated base for {}.", this.base);
        } finally {
            this.updateWorkFlowLock.unlock();
        }
    }

    public void applyConfigs(WorkFlowConfigs workFlowConfigs) {
        this.updateWorkFlowLock.lock();
        try {
            long startTime = TextHelper.startTime();
            this.workFlowConfigs = workFlowConfigs;
            initCleanUpJob();
            logger().info("Finished updating configs to storage, Took: {}", TextHelper.endTime(startTime));
            this.updateWorkFlowLock.unlock();
        } catch (Throwable th) {
            this.updateWorkFlowLock.unlock();
            throw th;
        }
    }

    @JsonIgnore
    public WorkFlowParent getWorkFlowParent() {
        return this.base.getParent();
    }

    @JsonIgnore
    public WorkFlow getWorkFlow() {
        return this.base.getWorkFlow();
    }

    @JsonIgnore
    public WorkFlowExecution getExecution() {
        return this.base.getExecution();
    }

    public boolean isDev() {
        return this.base.getEnv().equals(WorkFlowEnvironment.DEV);
    }

    public WorkFlowBase.Key key() {
        return this.base.key();
    }

    private void setUpdatedStats() {
        storage().getStatsStorage().updateCurrent();
        this.base.getExecution().updateAll();
        ExecutionChangePublisher.publishExecution(this.base.getExecution());
    }

    public WorkFlowExecutionIteration startManualIteration(@NotNull IterationStartInput iterationStartInput) throws ExecutionException, IterationStartFailException, WorkflowStoppingException {
        long startTime = TextHelper.startTime();
        long longValue = storage().getConfigurationStorage().generateIterationIndex().longValue();
        logger().debug("Creating a new iteration {} ...", Long.valueOf(longValue));
        if (isDev()) {
            WorkFlowInputSet input = iterationStartInput.getInput();
            if (input.getNodesConnectionsProperties() != null && input.getNodesConnectionsProperties().isFilled()) {
                updateNodesConnectionsProperties(input.getNodesConnectionsProperties());
            }
        }
        WorkFlowExecutionIteration createExecutionIteration = WorkFlowExecutionIteration.createExecutionIteration(this, longValue, WorkFlowExecutionIterationStatus.QUEUED, iterationStartInput.getInput().getExecuteAsUser(), null);
        this.iterationsPool.startAsync(createExecutionIteration, iterationStartInput, false);
        logger().info("Created a new iteration {} (Took: {})", Long.valueOf(longValue), TextHelper.endTime(startTime));
        return createExecutionIteration;
    }

    public WorkFlowExecutionIteration reStartManualIteration(@NotNull IterationStartInput iterationStartInput) throws IterationStartFailException, WorkflowStoppingException {
        long startTime = TextHelper.startTime();
        logger().debug("Re-Executing iteration {} ...", iterationStartInput.getIterationId());
        WorkFlowExecutionIteration byId = WorkFlowExecutionIteration.getById(iterationStartInput.getIterationId(), false);
        if (byId == null) {
            throw new RuntimeException("Runtime Error: Iteration " + iterationStartInput.getIterationId() + " not found!");
        }
        if (byId.getStatus().equals(WorkFlowExecutionIterationStatus.RUNNING)) {
            throw new IterationRunningException("Runtime Error: Preview pane is busy. Please try agan later.");
        }
        byId.setAsLoading();
        if (isDev()) {
            updateNodesConnectionsProperties(iterationStartInput.getInput().getNodesConnectionsProperties());
        }
        this.iterationsPool.startAsync(byId, iterationStartInput, true);
        logger().info("Re-Executing iteration {} (Took: {})", iterationStartInput.getIterationId(), TextHelper.endTime(startTime));
        return byId;
    }

    public void createFailedIteration(User user, @NotNull WorkFlowExecutionNode workFlowExecutionNode) {
        long longValue = storage().getConfigurationStorage().generateIterationIndex().longValue();
        workFlowExecutionNode.setStatus(ExecutionNodeStatus.FAILED);
        WorkFlowExecutionIteration createExecutionIteration = WorkFlowExecutionIteration.createExecutionIteration(this, longValue, WorkFlowExecutionIterationStatus.FAILED, user, workFlowExecutionNode);
        onNodeProgressUpdate(IterationFinishStateChangeEvent.builder().iterationId(createExecutionIteration.getId()).status(createExecutionIteration.getStatus()).build());
    }

    public CompletableFuture<Void> stopIteration(@NotNull String str, boolean z) {
        return this.iterationsPool.stopIterationAsync(str, z);
    }

    public void startTrigger(User user, @NotNull TriggerStartStopInput triggerStartStopInput) throws Exception {
        if (isDev()) {
            updateNodesConnectionsProperties(triggerStartStopInput.getInput().getNodesConnectionsProperties());
        }
        Iterator<String> it = triggerStartStopInput.getTriggers().iterator();
        while (it.hasNext()) {
            this.triggersPool.startTriggerById(it.next(), user);
        }
    }

    public void stopTrigger(User user, @NotNull TriggerStartStopInput triggerStartStopInput) throws Exception {
        if (isDev()) {
            updateNodesConnectionsProperties(triggerStartStopInput.getInput().getNodesConnectionsProperties());
        }
        Iterator<String> it = triggerStartStopInput.getTriggers().iterator();
        while (it.hasNext()) {
            this.triggersPool.stopTriggerById(getNodeConfig(it.next()).getInstanceId(), user);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        ExecutionStorage storage;
        ExecutionStorage storage2;
        ExecutionStorage storage3;
        long startTime = TextHelper.startTime();
        WorkFlowExecution execution = this.base.getExecution();
        try {
            try {
                ExecutionStorage storage4 = storage();
                if (!storage4.isOpen()) {
                    throw new Exception("Storage is not opened!");
                }
                Objects.requireNonNull(this.startInputs, "Input object can't be null!");
                Objects.requireNonNull(this.triggersPool, "Trigger pool can't be null!");
                WorkFlowDeployment deployment = getWorkFlowParent().deployment();
                if (execution.isProd()) {
                    Objects.requireNonNull(deployment, "Production workflow should always have deployment not null!");
                }
                storage4.getStatsStorage().refresh();
                execution.setStartedAt(Instant.now());
                execution.setStatus(WorkFlowExecutionStatus.LIVE);
                execution.setStartedBy(this.startedBy);
                execution.setRecoverable(true);
                this.propertiesCompiler.compileRuntimeSystemProperties();
                markRunningAsFailed();
                this.started = true;
                execution.updateAll();
                setStatusMessage("Manager started.");
                this.triggersPool.initTriggers(getStartedBy(), this.startInputs.autoDisableStartedTriggers, this.startInputs.startAllTriggers, execution.isProd() ? deployment.getStartTriggers() : this.startInputs.startedTriggers);
                logger().info("Started manager {}, Took: {}", this.startedBy.getUsername(), TextHelper.endTime(startTime));
                messagesStore().onNext(NodeProcessorMessage.buildExecutionInfo("Manager Started", this.startInputs.startMessage));
                startQueuedIterations(storage4, this.startInputs.isStartQueued());
                this.startedFully = true;
                synchronized (this) {
                    try {
                        long millis = Duration.ofMinutes(5L).toMillis();
                        while (!isInterrupted()) {
                            wait(millis);
                        }
                    } catch (InterruptedException e) {
                        log.trace("Thread interrupted");
                    }
                }
                if (storage4 != null) {
                    storage3 = storage4;
                } else {
                    try {
                        storage3 = storage();
                    } catch (Exception e2) {
                        logger().error("Failed while closing the Manager, Error: {}", e2.getMessage());
                    }
                }
                stopManager(storage3, startTime, execution.getStatus().equals(WorkFlowExecutionStatus.FAILED));
                this.stopped = true;
                this.started = false;
                this.startedFully = false;
            } catch (Throwable th) {
                messagesStore().onNext(NodeProcessorMessage.buildExecutionError("Run-time", "Workflow has shutdown unexpectedly: " + th.getMessage(), th));
                execution.setStatus(WorkFlowExecutionStatus.FAILED);
                setStatusMessage("Workflow has shutdown unexpectedly: " + th.getMessage());
                logger().error("Workflow has shutdown unexpectedly, id: {}", getWorkFlowParent().getId(), th);
                if (0 != 0) {
                    storage2 = null;
                } else {
                    try {
                        storage2 = storage();
                    } catch (Exception e3) {
                        logger().error("Failed while closing the Manager, Error: {}", e3.getMessage());
                        this.stopped = true;
                        this.started = false;
                        this.startedFully = false;
                    }
                }
                stopManager(storage2, startTime, execution.getStatus().equals(WorkFlowExecutionStatus.FAILED));
                this.stopped = true;
                this.started = false;
                this.startedFully = false;
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                storage = null;
            } else {
                try {
                    storage = storage();
                } catch (Exception e4) {
                    logger().error("Failed while closing the Manager, Error: {}", e4.getMessage());
                    this.stopped = true;
                    this.started = false;
                    this.startedFully = false;
                    throw th2;
                }
            }
            stopManager(storage, startTime, execution.getStatus().equals(WorkFlowExecutionStatus.FAILED));
            this.stopped = true;
            this.started = false;
            this.startedFully = false;
            throw th2;
        }
    }

    private void initCleanUpJob() {
        if (this.workFlowConfigs.getAutoCleanUpHistory().get().booleanValue()) {
            this.cleanUpAfterLimit = this.workFlowConfigs.getAutoCleanUpLimit().get().intValue();
            this.cleanUpOlderThanDuration = this.workFlowConfigs.getAutoCleanUpHistoryAfter().get();
            if (this.cleanUpOlderThanDuration.toDays() < 1) {
                logger().warn("The cleanup is set to duration of {}, which is lower than 1 day, cleanup job is turned off automatically!", this.cleanUpOlderThanDuration);
                this.cleanUpOlderThanDuration = null;
            }
            if (this.cleanUpAfterLimit <= 0 || this.cleanUpAfterLimit >= 10) {
                return;
            }
            logger().warn("The cleanup after max exceeded need to be greater than 10, specified {}", Integer.valueOf(this.cleanUpAfterLimit));
        }
    }

    @NotNull
    private Set<String> getNodeIds(@NotNull List<NodeConfiguration<? extends AbstractNodeConfigurationData>> list) {
        HashSet hashSet = new HashSet();
        Iterator<NodeConfiguration<? extends AbstractNodeConfigurationData>> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getInstanceId());
        }
        return hashSet;
    }

    public void updateNodesConnectionsProperties(@NotNull NodesConnectionsPropertiesInput nodesConnectionsPropertiesInput) {
        this.updateWorkFlowLock.lock();
        try {
            long startTime = TextHelper.startTime();
            if (logger().isTraceEnabled()) {
                logger().trace("Adding nodes and connections from input ...");
            }
            HashSet hashSet = new HashSet();
            for (NodeConfiguration<? extends AbstractNodeConfigurationData> nodeConfiguration : nodesConnectionsPropertiesInput.getNodes()) {
                boolean isValid = nodeConfiguration.getUi().isValid();
                boolean isDev = isDev();
                boolean z = !isValid && nodeConfiguration.getUi().getValidationCause().getType().equals(NodeValidationCauseType.CONFIGURATION);
                if (isValid || (isDev && z)) {
                    updateSingleNode(nodeConfiguration, false);
                    hashSet.add(nodeConfiguration.getInstanceId());
                }
            }
            this.nodes.entrySet().removeIf(entry -> {
                String str = (String) entry.getKey();
                if (hashSet.contains(str)) {
                    return false;
                }
                this.nodeDestinations.removeAll(str);
                this.connections.entrySet().removeIf(entry -> {
                    return ((NodesConnection) entry.getValue()).getDestinationId().equals(str) || ((NodesConnection) entry.getValue()).getSourceId().equals(str);
                });
                this.nodeDestinations.entries().removeIf(entry2 -> {
                    return ((NodesConnection) entry2.getValue()).getDestinationId().equals(str);
                });
                logger().info("Removed Node {} configs/connections from the storage.", str);
                return true;
            });
            HashSet hashSet2 = new HashSet();
            for (NodesConnection<? extends AbstractNodeConnectionData> nodesConnection : nodesConnectionsPropertiesInput.getConnections()) {
                if (this.nodes.containsKey(nodesConnection.getSourceId()) && this.nodes.containsKey(nodesConnection.getDestinationId())) {
                    hashSet2.add(nodesConnection.getUuid());
                    this.connections.put(nodesConnection.getUuid(), nodesConnection);
                    if (!this.nodeDestinations.containsEntry(nodesConnection.getSourceId(), nodesConnection)) {
                        this.nodeDestinations.put(nodesConnection.getSourceId(), nodesConnection);
                    }
                }
            }
            this.connections.entrySet().removeIf(entry2 -> {
                return !hashSet2.contains(entry2.getKey());
            });
            this.globalStaticProperties.clear();
            this.globalScriptProperties.clear();
            for (PropertyInput propertyInput : nodesConnectionsPropertiesInput.getGlobalProperties()) {
                if (propertyInput.isScript()) {
                    this.globalScriptProperties.put(propertyInput.getId(), propertyInput);
                } else {
                    this.globalStaticProperties.put(propertyInput.getId(), propertyInput);
                }
            }
            this.propertiesCompiler.compileGlobalStaticProperties(this.globalStaticProperties);
            logger().trace("Building graph state ...");
            this.dagGraph.clear();
            fillDagGraph(this.dagGraph);
            this.dagGraph.init(this.dagGraphPrint);
            if (logger().isTraceEnabled()) {
                logger().trace("Finished building graph state {}", this.dagGraphPrint.getPrinter());
                logger().trace("Finished updating nodes ({}) and connections ({}) to storage, Took: {}", new Object[]{Integer.valueOf(this.nodes.size()), Integer.valueOf(this.connections.size()), TextHelper.endTime(startTime)});
            }
        } finally {
            this.updateWorkFlowLock.unlock();
        }
    }

    public void updateSingleNode(@NotNull NodeConfiguration<? extends AbstractNodeConfigurationData> nodeConfiguration, boolean z) {
        NodeConfiguration<? extends AbstractNodeConfigurationData> nodeConfiguration2 = this.nodes.get(nodeConfiguration.getInstanceId());
        if (!z && nodeConfiguration2 != null && nodeConfiguration.getRevision() > 0 && nodeConfiguration2.getRevision() == nodeConfiguration.getRevision()) {
            logger().trace("Node {} revision did not changed from {}", nodeConfiguration.getInstanceId(), Integer.valueOf(nodeConfiguration.getRevision()));
            return;
        }
        try {
            this.nodes.put(nodeConfiguration.getInstanceId(), nodeConfiguration);
        } catch (Exception e) {
            logger().error("Config load {}", nodeConfiguration.getInstanceId(), e);
        }
    }

    private void fillDagGraph(DefaultDag<String, NodeResults> defaultDag) {
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        for (NodesConnection<? extends AbstractNodeConnectionData> nodesConnection : this.connections.values()) {
            if (this.nodes.containsKey(nodesConnection.getSourceId()) && this.nodes.containsKey(nodesConnection.getDestinationId())) {
                defaultDag.addDependency(nodesConnection.getSourceId(), nodesConnection.getDestinationId());
                newKeySet.add(nodesConnection.getSourceId());
                newKeySet.add(nodesConnection.getDestinationId());
                logger().trace("Finished init connection {}", nodesConnection);
            }
        }
        for (NodeConfiguration<? extends AbstractNodeConfigurationData> nodeConfiguration : this.nodes.values()) {
            try {
                if (!newKeySet.contains(nodeConfiguration.getInstanceId()) && !nodeConfiguration.getNodeType().isHasInputPorts()) {
                    defaultDag.addIndependent(nodeConfiguration.getInstanceId());
                }
            } catch (Exception e) {
            }
        }
    }

    public DefaultDag<String, NodeResults> createDagCopy() {
        DefaultDag<String, NodeResults> defaultDag = new DefaultDag<>();
        fillDagGraph(defaultDag);
        return defaultDag;
    }

    public <T extends AbstractNodeConfigurationData> NodeConfiguration<T> getNodeConfig(String str) {
        return (NodeConfiguration) this.nodes.get(str);
    }

    public NodeGraphData getNodeGraphData(String str) {
        return this.dagGraphPrint.getNodeData(str);
    }

    public boolean doShareDestination(String str, String str2) {
        return this.dagGraphPrint.doShareDestination(str, str2);
    }

    public List<String> getPreviousNodes(String str) {
        return this.dagGraphPrint.getNodeData(str).getDependenciesData(this.dagGraphPrint, false);
    }

    public Set<String> getPreviousOneLevel(String str) {
        return this.dagGraphPrint.getNodeData(str).getDependencies();
    }

    public boolean isConnectedNodes(@NotNull String str, String str2) {
        return this.dagGraphPrint.isConnectedNodes(str, str2);
    }

    public Collection<NodesConnection<? extends AbstractNodeConnectionData>> getNodeDestinations(String str) {
        return this.nodeDestinations.get(str);
    }

    public boolean isRunning() {
        return this.base.getExecution().isRunning();
    }

    public boolean isLoading() {
        return this.base.getExecution().isLoading();
    }

    public Logger logger() {
        try {
            return this.base.getExecution().logger();
        } catch (Exception e) {
            return log;
        }
    }

    private void markRunningAsFailed() {
        try {
            WorkFlowExecutionIteration.getRunningIterations(this.base.getExecution()).forEach(workFlowExecutionIteration -> {
                markAsFailed(workFlowExecutionIteration, "Found " + workFlowExecutionIteration.getStatus() + ", marking it as failed");
            });
        } catch (Exception e) {
            logger().error("Failed while marking running stuck iterations, ignoring error.", e);
        }
    }

    private void startQueuedIterations(ExecutionStorage executionStorage, boolean z) {
        List<WorkFlowExecutionIteration> queuedIterations = WorkFlowExecutionIteration.getQueuedIterations(this.base.getExecution());
        if (queuedIterations.isEmpty()) {
            return;
        }
        messagesStore().onNext(NodeProcessorMessage.buildExecutionInfo("Manager Started", "Found " + queuedIterations.size() + " Queued iterations auto Start: " + z));
        queuedIterations.forEach(workFlowExecutionIteration -> {
            if (this.iterationsPool.isActive(workFlowExecutionIteration.getId()) || this.iterationsPool.isQueued(workFlowExecutionIteration.getId())) {
                logger().warn("Found queued iteration {}:{}, but it's already running, ignoring it.", workFlowExecutionIteration.getId(), Long.valueOf(workFlowExecutionIteration.getIteration()));
                return;
            }
            if (!z) {
                markAsFailed(workFlowExecutionIteration, "The user " + this.startInputs.startedByUser.getUsername() + " specified not to auto start queued iterations, marking it as failed.");
                return;
            }
            IterationStartInput iterationStartInput = executionStorage.getIterationStartInputStorage().get(workFlowExecutionIteration.getIteration());
            if (iterationStartInput == null) {
                markAsFailed(workFlowExecutionIteration, "Can't find start input's for iteration " + workFlowExecutionIteration.getIteration());
                logger().error("Can't find start input's for iteration {}", Long.valueOf(workFlowExecutionIteration.getIteration()));
                return;
            }
            try {
                logger().info("Recovering Iteration {}:{} that was Queued and running it now ...", workFlowExecutionIteration.getId(), Long.valueOf(workFlowExecutionIteration.getIteration()));
                messagesStore().onNext(NodeProcessorMessage.buildIterationInfo(workFlowExecutionIteration.getId(), "Start Iteration Process", "Recovering Iteration that was Queued and running it now ..."));
                this.iterationsPool.startAsync(workFlowExecutionIteration, iterationStartInput, true);
            } catch (Exception e) {
                logger().error("Can't start iteration {}", Long.valueOf(workFlowExecutionIteration.getIteration()), e);
                markAsFailed(workFlowExecutionIteration, "Found stuck with status: " + workFlowExecutionIteration.getStatus() + ", but failed to be started: " + e.getMessage());
            }
        });
    }

    private void markAsFailed(@NotNull WorkFlowExecutionIteration workFlowExecutionIteration, String str) {
        messagesStore().onNext(NodeProcessorMessage.buildIterationError(workFlowExecutionIteration.getId(), "Auto Start", str, null));
        workFlowExecutionIteration.getData().setProgressMsg(str);
        workFlowExecutionIteration.setStatus(WorkFlowExecutionIterationStatus.FAILED);
        workFlowExecutionIteration.updateNow();
        workFlowExecutionIteration.getNodes().forEach(workFlowExecutionNode -> {
            if (workFlowExecutionNode.getStatus().equals(ExecutionNodeStatus.RUNNING) || workFlowExecutionNode.getStatus().equals(ExecutionNodeStatus.STOPPING)) {
                workFlowExecutionNode.setStatus(ExecutionNodeStatus.FAILED);
                workFlowExecutionNode.getState().setProgressMessage("Found status running, setting it as failed");
                workFlowExecutionNode.update();
            }
        });
        logger().warn("Found status: " + workFlowExecutionIteration.getStatus() + " iteration {}:{}, Changed its message: {}", new Object[]{workFlowExecutionIteration.getId(), Long.valueOf(workFlowExecutionIteration.getIteration()), workFlowExecutionIteration.getData().getProgressMsg()});
    }

    public ExecutionStorage storage() {
        return this.workFlowFactory.getStorageLoader().get(this.base.getExecution());
    }

    public MessagesStorage messagesStore() {
        return storage().getMessagesStore();
    }

    public StatsStorage statsStorage() {
        return storage().getStatsStorage();
    }

    private void stopManager(ExecutionStorage executionStorage, long j, boolean z) {
        long startTime = TextHelper.startTime();
        try {
            WorkFlowExecution execution = this.base.getExecution();
            logger().info("Stopping execution manager ...");
            execution.setStatus(WorkFlowExecutionStatus.STOPPING);
            execution.updateAll();
            this.stopping = true;
            this.triggersPool.shutdownTriggers(this.stopInputs.force);
            this.iterationsPool.shutdown(this.stopInputs.force);
            try {
                try {
                    Thread.sleep(200L);
                } catch (Exception e) {
                    logger().warn("Save failed, might be another job already saved this execution {}", e.getMessage());
                }
            } catch (Exception e2) {
            }
            finalizeStopping(executionStorage, z);
            messagesStore().onNext(NodeProcessorMessage.buildExecutionInfo("Manager Stopped", String.format("%s (Online For: %s)", this.stopInputs.stopMessage, TextHelper.endTime(j))));
            logger().info("WorkFlow Manager stopped successfully, Cause: {} (Took {})", this.stopInputs.getStopMessage(), TextHelper.endTime(startTime));
        } catch (Throwable th) {
            logger().error("Process Manager failed to stop", th);
        }
    }

    public boolean isForceStopping() {
        return this.stopping && this.stopInputs.force;
    }

    private void finalizeStopping(ExecutionStorage executionStorage, boolean z) {
        WorkFlowExecution execution = this.base.getExecution();
        execution.setEndAt(Instant.now());
        if (z) {
            execution.setStatus(WorkFlowExecutionStatus.FAILED);
        } else if (!execution.getStatus().equals(WorkFlowExecutionStatus.FAILED)) {
            execution.setStatus(WorkFlowExecutionStatus.OFFLINE);
            execution.setRecoverable(this.stopInputs.recoverable);
            execution.getData().setPaused(this.stopInputs.markPaused);
            setStatusMessage("Manager stopped successfully.");
        }
        setUpdatedStats();
        execution.updateAll();
    }

    public void remove() throws IOException {
        this.base.getExecution().closeLogger();
        log.trace("Removed {}", key());
    }

    public synchronized void setStatusMessage(String str) {
        this.base.getExecution().getData().setStatusMessage(str);
    }

    public synchronized void onNodeProgressUpdate(@NotNull IterationFinishStateChangeEvent iterationFinishStateChangeEvent) {
        try {
            if (iterationFinishStateChangeEvent.getStatus() != null) {
                if (iterationFinishStateChangeEvent.getStatus().equals(WorkFlowExecutionIterationStatus.COMPLETED)) {
                    statsStorage().getCompleted().incrementAndGet();
                } else if (iterationFinishStateChangeEvent.getStatus().equals(WorkFlowExecutionIterationStatus.FAILED)) {
                    statsStorage().getFailed().incrementAndGet();
                }
                if (iterationFinishStateChangeEvent.getTotalSize() > 0) {
                    statsStorage().getDataSizeBytes().addAndGet(iterationFinishStateChangeEvent.getTotalSize());
                }
            }
        } finally {
            setUpdatedStats();
        }
    }

    public void waitForStartFinished() {
        Instant now = Instant.now();
        Duration ofMinutes = Duration.ofMinutes(1L);
        while (!isStartedFully()) {
            if (isStopped()) {
                throw new RuntimeException("Manager " + key() + " stopped while waiting for it to start.");
            }
            try {
                wait(200L);
            } catch (Exception e) {
            }
            if (now.plus((TemporalAmount) ofMinutes).isBefore(Instant.now())) {
                throw new RuntimeException("Reached timeout of " + ofMinutes + "while waiting for the manager " + key() + " to start.");
            }
        }
    }

    public void waitForStopFinish(ExecutionStorage executionStorage) throws Exception {
        long startTime = TextHelper.startTime();
        if (!isAlive()) {
            logger().debug("The thread is not alive, we can't wait for shutdown, marking it as stopped.");
            finalizeStopping(executionStorage, false);
            return;
        }
        WorkFlowConfigs workFlowConfigs = getWorkFlowConfigs();
        Instant now = Instant.now();
        synchronized (this) {
            do {
                if (!this.stopped) {
                    try {
                        wait(200L);
                    } catch (Exception e) {
                    }
                }
            } while (!now.plus((TemporalAmount) workFlowConfigs.getStoppingTimeOut().get()).isBefore(Instant.now()));
            messagesStore().onNext(NodeProcessorMessage.buildExecutionError("Manager Stopped", "Reached timeout while waiting for the manager to shutdown.", null));
            throw new Exception("Reached timeout while waiting for the manager to shutdown.");
        }
        if (logger().isTraceEnabled()) {
            logger().trace("waitForStopFinish completed took: {}.", TextHelper.endTime(startTime));
        }
    }

    public void cleanUp(boolean z) throws ExecutionException {
        if (this.cleaningUp) {
            log.warn("Already cleaning up, skipping this request.");
            return;
        }
        try {
            try {
                WorkFlowExecution execution = this.base.getExecution();
                this.cleaningUp = true;
                execution.updateOnly();
                ExecutionStorage storage = storage();
                if (z) {
                    AtomicInteger atomicInteger = new AtomicInteger();
                    if (this.cleanUpOlderThanDuration != null) {
                        long startTime = TextHelper.startTime();
                        do {
                            List<WorkFlowExecutionIteration> olderThanIterations = WorkFlowExecutionIteration.olderThanIterations(execution, this.cleanUpOlderThanDuration, 500);
                            int size = olderThanIterations.size();
                            if (!olderThanIterations.isEmpty()) {
                                logger().debug("Cleaning up {} iterations, older than {}", Integer.valueOf(olderThanIterations.size()), this.cleanUpOlderThanDuration);
                                cleanIterationsList(atomicInteger, olderThanIterations, storage);
                            }
                            if (isStopping() || isStopped() || size < 500) {
                                break;
                            }
                        } while (atomicInteger.get() < 15000);
                        if (atomicInteger.get() > 0) {
                            messagesStore().onNext(NodeProcessorMessage.buildExecutionInfo("Cleanup Job", "Finished cleaning " + atomicInteger.get() + " iterations, Older than: " + this.cleanUpOlderThanDuration + ", Took: " + TextHelper.endTime(startTime)));
                        }
                    }
                    if (this.cleanUpAfterLimit >= 10) {
                        long startTime2 = TextHelper.startTime();
                        do {
                            List<WorkFlowExecutionIteration> greaterThanLimit = WorkFlowExecutionIteration.greaterThanLimit(execution, this.cleanUpAfterLimit, 500);
                            int size2 = greaterThanLimit.size();
                            if (!greaterThanLimit.isEmpty()) {
                                logger().debug("Cleaning up {} iterations, limit {} ...", Integer.valueOf(greaterThanLimit.size()), Integer.valueOf(this.cleanUpAfterLimit));
                                cleanIterationsList(atomicInteger, greaterThanLimit, storage);
                            }
                            if (isStopping() || isStopped() || size2 < 500) {
                                break;
                            }
                        } while (atomicInteger.get() < 15000);
                        if (atomicInteger.get() > 0) {
                            messagesStore().onNext(NodeProcessorMessage.buildExecutionInfo("Cleanup Job", "Finished cleaning " + atomicInteger.get() + " iterations, After Limit: " + this.cleanUpAfterLimit + ", Took: " + TextHelper.endTime(startTime2)));
                        }
                    }
                }
                storage.cleanUp();
                ExecutionChangePublisher.publishExecution(this.base.getExecution());
                this.cleaningUp = false;
            } catch (Throwable th) {
                logger().error("Failed while running cleanup, will continue after restart or next periodic run.", th);
                messagesStore().onNext(NodeProcessorMessage.buildExecutionWarning("Cleanup Job", "Failed while running cleanup, will continue after restart or next periodic run, error: " + th.getMessage()));
                this.cleaningUp = false;
            }
        } catch (Throwable th2) {
            this.cleaningUp = false;
            throw th2;
        }
    }

    private void cleanIterationsList(@NotNull AtomicInteger atomicInteger, @NotNull List<WorkFlowExecutionIteration> list, @NotNull ExecutionStorage executionStorage) {
        long startTime = TextHelper.startTime();
        ((Stream) list.stream().parallel()).forEach(workFlowExecutionIteration -> {
            if (workFlowExecutionIteration.isRunning()) {
                logger().warn("Found running iteration {}:{} while cleaning up, skipping it from cleanup.", workFlowExecutionIteration.getId(), Long.valueOf(workFlowExecutionIteration.getIteration()));
            } else {
                workFlowExecutionIteration.deleteIteration(executionStorage, logger());
                atomicInteger.getAndIncrement();
            }
        });
        logger().info("Finished cleaning {} iterations, Max Limit: {}, Cleanup After Date: {}, Took: {}", new Object[]{Integer.valueOf(atomicInteger.get()), Integer.valueOf(this.cleanUpAfterLimit), this.cleanUpOlderThanDuration, TextHelper.endTime(startTime)});
    }

    @Override // java.lang.Thread
    public String toString() {
        return this.base.toString();
    }

    public WorkFlowFactory getWorkFlowFactory() {
        return this.workFlowFactory;
    }

    public ActorSystem getActorSystem() {
        return this.actorSystem;
    }

    public WorkFlowBase getBase() {
        return this.base;
    }

    public User getStartedBy() {
        return this.startedBy;
    }

    public TriggersThreadPool getTriggersPool() {
        return this.triggersPool;
    }

    public IterationsThreadPool getIterationsPool() {
        return this.iterationsPool;
    }

    public ConcurrentHashMap<String, NodeConfiguration<? extends AbstractNodeConfigurationData>> getNodes() {
        return this.nodes;
    }

    public ConcurrentHashMap<String, PropertyInput> getGlobalScriptProperties() {
        return this.globalScriptProperties;
    }

    public boolean isLoaded() {
        return this.loaded;
    }

    public boolean isStopping() {
        return this.stopping;
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public boolean isStarted() {
        return this.started;
    }

    public boolean isStartedFully() {
        return this.startedFully;
    }

    public int getRecoverRetries() {
        return this.recoverRetries;
    }

    public StartInputs getStartInputs() {
        return this.startInputs;
    }

    @JsonIgnore
    public void setStartInputs(StartInputs startInputs) {
        this.startInputs = startInputs;
    }

    public StopInputs getStopInputs() {
        return this.stopInputs;
    }

    @JsonIgnore
    public void setStopInputs(StopInputs stopInputs) {
        this.stopInputs = stopInputs;
    }

    public WorkFlowConfigs getWorkFlowConfigs() {
        return this.workFlowConfigs;
    }

    public WorkFlowCompiler getCompiler() {
        return this.compiler;
    }

    public PropertiesCompiler getPropertiesCompiler() {
        return this.propertiesCompiler;
    }
}
