package com.nazdaq.workflow.graphql.resolvers.subscriptions.execution.publishers;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.nazdaq.core.helpers.AppConfig;
import com.nazdaq.noms.app.auth.AbstractGraphQLPublisher;
import com.nazdaq.workflow.engine.core.manager.WorkFlowFactory;
import com.nazdaq.workflow.engine.core.storage.models.messages.NodeProcessorMessage;
import com.nazdaq.workflow.graphql.models.execution.PaginationDirection;
import graphql.execution.ExecutionId;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import models.workflow.executions.WorkFlowExecution;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nazdaq/workflow/graphql/resolvers/subscriptions/execution/publishers/MessagesChangePublisher.class */
public class MessagesChangePublisher extends AbstractGraphQLPublisher<List<NodeProcessorMessage>> {
    private static final Logger log = LoggerFactory.getLogger(MessagesChangePublisher.class);
    private static final Multimap<String, MessagesChangePublisher> publishers = ArrayListMultimap.create();
    private final WorkFlowFactory workFlowFactory;
    private final String executionId;
    private final PublishSubject<NodeProcessorMessage> messagesPublisher;
    private final Disposable disposeMessagesPublisher;
    private final Disposable disposeSendOnInit;

    @NotNull
    public static MessagesChangePublisher create(WorkFlowFactory workFlowFactory, ExecutionId executionId, WorkFlowExecution workFlowExecution, int i) {
        MessagesChangePublisher messagesChangePublisher = new MessagesChangePublisher(workFlowFactory, executionId, workFlowExecution, i);
        publishers.put(workFlowExecution.getId(), messagesChangePublisher);
        log.trace("Added Publisher ExecutionId: {}, TransactionId: {}, Total: {}", new Object[]{workFlowExecution.getId(), executionId, Integer.valueOf(publishers.size())});
        return messagesChangePublisher;
    }

    private MessagesChangePublisher(WorkFlowFactory workFlowFactory, ExecutionId executionId, @NotNull WorkFlowExecution workFlowExecution, int i) {
        super(executionId, (ScheduledExecutorService) null, 0L, 0L, TimeUnit.MILLISECONDS);
        this.workFlowFactory = workFlowFactory;
        this.executionId = workFlowExecution.getId();
        this.messagesPublisher = PublishSubject.create();
        this.disposeMessagesPublisher = this.messagesPublisher.observeOn(Schedulers.io()).subscribeOn(Schedulers.io()).buffer(800L, TimeUnit.MILLISECONDS).subscribe(list -> {
            if (list.isEmpty() || AppConfig.shuttingDown) {
                return;
            }
            pushMessages(list);
        }, th -> {
            log.error("Error", th);
        });
        this.disposeSendOnInit = addInitData(workFlowExecution, i);
    }

    public static void publishMessage(String str, @NotNull NodeProcessorMessage nodeProcessorMessage) {
        if (!publishers.containsKey(str) || AppConfig.shuttingDown) {
            return;
        }
        Iterator it = publishers.get(str).iterator();
        while (it.hasNext()) {
            ((MessagesChangePublisher) it.next()).publishMessage(nodeProcessorMessage);
        }
    }

    private void pushMessages(@NotNull List<NodeProcessorMessage> list) {
        pushUpdate(list);
    }

    private void publishMessage(@NotNull NodeProcessorMessage nodeProcessorMessage) {
        this.messagesPublisher.onNext(nodeProcessorMessage);
    }

    @NotNull
    private Disposable addInitData(WorkFlowExecution workFlowExecution, int i) {
        return Completable.timer(50L, TimeUnit.MILLISECONDS, Schedulers.single()).subscribe(() -> {
            if (AppConfig.shuttingDown) {
                return;
            }
            pushUpdate(this.workFlowFactory.getStorageLoader().get(workFlowExecution).getMessagesStore().getExecutionMessages(PaginationDirection.OLDER, 0L, i));
        });
    }

    /* renamed from: getUpdate, reason: merged with bridge method [inline-methods] */
    public List<NodeProcessorMessage> m238getUpdate() {
        return null;
    }

    public void onClose() {
        if (this.disposeMessagesPublisher != null && !this.disposeMessagesPublisher.isDisposed()) {
            this.disposeMessagesPublisher.dispose();
        }
        if (this.disposeSendOnInit != null && !this.disposeSendOnInit.isDisposed()) {
            this.disposeSendOnInit.dispose();
        }
        publishers.remove(this.executionId, this);
        log.trace("Removed Publisher ExecutionId: {}, TransactionId: {}, Total: {}", new Object[]{this.executionId, this.transactionId, Integer.valueOf(publishers.size())});
    }

    public int hashCode() {
        return this.transactionId.hashCode();
    }
}
