package com.atlassian.pipelines.runner.core.service;

import com.atlassian.pipelines.runner.api.RunnerState;
import com.atlassian.pipelines.runner.api.configuration.RunnerConfiguration;
import com.atlassian.pipelines.runner.api.configuration.schedule.ScheduleConfiguration;
import com.atlassian.pipelines.runner.api.model.runner.Runner;
import com.atlassian.pipelines.runner.api.model.step.Step;
import com.atlassian.pipelines.runner.api.model.step.StepId;
import com.atlassian.pipelines.runner.api.service.RunnerService;
import com.atlassian.pipelines.runner.api.service.ScheduleService;
import com.atlassian.pipelines.runner.api.util.rx.RxSchedulers;
import com.atlassian.pipelines.runner.core.configuration.Runtime;
import com.atlassian.pipelines.runner.core.exception.RunnerDisabledException;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.vavr.control.Option;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Profile({Runtime.Strings.LINUX_SHELL, Runtime.Strings.LINUX_DOCKER, Runtime.Strings.MACOS_BASH, Runtime.Strings.MACOS_TART, Runtime.Strings.WINDOWS_POWERSHELL, Runtime.Strings.ALWAYS_FAIL})
@Component
/* loaded from: input_file:com/atlassian/pipelines/runner/core/service/ScheduleServiceImpl.class */
public final class ScheduleServiceImpl implements ScheduleService {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ScheduleServiceImpl.class);
    private final ScheduleConfiguration scheduleConfiguration;
    private final RxSchedulers rxSchedulers;
    private final RunnerService runnerService;
    private final RunnerState runnerState;

    @Autowired
    public ScheduleServiceImpl(RunnerConfiguration runnerConfiguration, RxSchedulers rxSchedulers, RunnerService runnerService, RunnerState runnerState) {
        this.scheduleConfiguration = runnerConfiguration.getStateUpdateScheduleConfiguration();
        this.rxSchedulers = rxSchedulers;
        this.runnerService = runnerService;
        this.runnerState = runnerState;
    }

    @Override // com.atlassian.pipelines.runner.api.service.ScheduleService
    public Observable<StepId> updateRunnerToOnlineAndGetStepId() {
        return Observable.interval(this.scheduleConfiguration.getInitialDelay().getSeconds(), this.scheduleConfiguration.getPeriod().getSeconds(), TimeUnit.SECONDS, this.rxSchedulers.polling()).doOnSubscribe(disposable -> {
            logger.info("Updating runner status to \"ONLINE\" and checking for new steps assigned to the runner after {} seconds and then every {} seconds.", Long.valueOf(this.scheduleConfiguration.getInitialDelay().getSeconds()), Long.valueOf(this.scheduleConfiguration.getPeriod().getSeconds()));
        }).filter(l -> {
            return (this.runnerState.isShuttingDown() || this.runnerState.isUnhealthy()) ? false : true;
        }).flatMapSingle(l2 -> {
            return this.runnerService.updateToOnline();
        }).flatMapSingle(this::handleRunnerDisabled).flatMapMaybe(runner -> {
            return runner.getStepId().isDefined() ? Maybe.just(runner.getStepId().get()) : Maybe.empty();
        }).retryWhen(observable -> {
            return observable.flatMap(th -> {
                return th instanceof RunnerDisabledException ? Observable.error(th) : Observable.just(th);
            }).delay(this.scheduleConfiguration.getPeriod().getSeconds(), TimeUnit.SECONDS, this.rxSchedulers.polling());
        });
    }

    private Single<Runner> handleRunnerDisabled(Runner runner) {
        if (!runner.isDisabled() || this.runnerState.isExecutingStep()) {
            return Single.just(runner);
        }
        this.runnerState.setDisabled();
        return Single.error(new RunnerDisabledException("Runner has been disabled. Shutting down."));
    }

    private Single<Runner> handleLogRequested(Runner runner, Step step) {
        if (runner.getRunningStepLogRequested().getOrElse((Option<Boolean>) false).equals(true)) {
            step.getLogContext().getLogUploadManager().startSendingLogsInRealtime(step.getServices());
        }
        return Single.just(runner);
    }

    @Override // com.atlassian.pipelines.runner.api.service.ScheduleService
    public Observable<StepId> pollForStepCompletion(Step step) {
        return Observable.interval(this.scheduleConfiguration.getPeriod().getSeconds(), TimeUnit.SECONDS, this.rxSchedulers.polling()).doOnSubscribe(disposable -> {
            logger.info("Checking for step completion every {} seconds.", this.scheduleConfiguration.getPeriod());
        }).flatMapSingle(l -> {
            return this.runnerService.get();
        }).flatMapSingle(runner -> {
            return handleLogRequested(runner, step);
        }).flatMapMaybe(runner2 -> {
            Option<StepId> stepId = runner2.getStepId();
            StepId id = step.getId();
            Objects.requireNonNull(id);
            return stepId.exists((v1) -> {
                return r1.equals(v1);
            }) ? Maybe.empty() : Maybe.just(step.getId());
        }).retryWhen(observable -> {
            return observable.delay(this.scheduleConfiguration.getPeriod().getSeconds(), TimeUnit.SECONDS, this.rxSchedulers.polling());
        });
    }
}
