package com.nazdaq.workflow.graphql.resolvers.subscriptions.manager;

import com.nazdaq.core.helpers.AppConfig;
import com.nazdaq.workflow.engine.core.manager.WorkFlowFactory;
import com.nazdaq.workflow.graphql.models.manager.WorkFlowManagerFilter;
import graphql.execution.ExecutionId;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import models.users.User;
import models.workflow.builder.WorkFlowParent;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nazdaq/workflow/graphql/resolvers/subscriptions/manager/ManagerPublisher.class */
public class ManagerPublisher {
    private static final Logger log = LoggerFactory.getLogger(ManagerPublisher.class);
    private static final ConcurrentHashMap<ExecutionId, ManagerPublisherSubscriber> subscribers = new ConcurrentHashMap<>();
    private static final PublishSubject<Long> publishSubject = PublishSubject.create();
    private static Disposable disposePublisher = null;

    @NotNull
    public static ManagerPublisherSubscriber create(WorkFlowFactory workFlowFactory, User user, ExecutionId executionId, WorkFlowManagerFilter workFlowManagerFilter) {
        ManagerPublisherSubscriber managerPublisherSubscriber = new ManagerPublisherSubscriber(workFlowFactory, user, executionId, workFlowManagerFilter);
        subscribers.put(executionId, managerPublisherSubscriber);
        if (disposePublisher == null) {
            initMainPublisher();
        }
        if (log.isTraceEnabled()) {
            log.trace("Added Publisher ExecutionId: {}, User: {}, Total: {}", new Object[]{executionId, user.getUsername(), Integer.valueOf(subscribers.size())});
        }
        return managerPublisherSubscriber;
    }

    public static void removeSubscriber(ExecutionId executionId) {
        subscribers.remove(executionId);
        if (subscribers.isEmpty() && disposePublisher != null && !disposePublisher.isDisposed()) {
            disposePublisher.dispose();
            disposePublisher = null;
            log.trace("Stopped Manager publisher.");
        }
        if (log.isTraceEnabled()) {
            log.trace("Removed Publisher ExecutionId: {}, Total: {}", executionId, Integer.valueOf(subscribers.size()));
        }
    }

    private static void initMainPublisher() {
        disposePublisher = publishSubject.observeOn(Schedulers.io()).buffer(450L, TimeUnit.MILLISECONDS).map(list -> {
            return new ArrayList(new HashSet(list));
        }).subscribe(arrayList -> {
            if (AppConfig.shuttingDown || arrayList.isEmpty()) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList.add(WorkFlowParent.getById((Long) it.next()));
            }
            Iterator<ManagerPublisherSubscriber> it2 = subscribers.values().iterator();
            while (it2.hasNext()) {
                it2.next().pushWorkflows(arrayList);
            }
        }, th -> {
            log.error("Error", th);
        });
        log.trace("Started Manager publisher ...");
    }

    private static void addToQueue(long j) {
        publishSubject.onNext(Long.valueOf(j));
    }

    public static void addWorkFlowToQueue(Long l) {
        if (subscribers.isEmpty() || AppConfig.shuttingDown) {
            return;
        }
        addToQueue(l.longValue());
    }
}
