#!/usr/bin/env php
<?php declare(strict_types = 1);

use Amp\ByteStream;
use Amp\Cluster\Cluster;
use Amp\Cluster\ClusterWatcher;
use Amp\File;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\TimeoutCancellation;
use League\CLImate\Argument\Manager as CliArgumentManager;
use Monolog\Formatter\LineFormatter;
use Monolog\Logger;
use Psr\Log\LogLevel;
use Revolt\EventLoop;
use function Amp\Cluster\countCpuCores;

const HELP = <<<EOT
            __           __
      _____/ /_  _______/ /____  _____
     / ___/ / / / / ___/ __/ _ \/ ___/
    / /__/ / /_/ (__  ) /_/  __/ /
    \___/_/\__,_/____/\__/\___/_/

    Usage: [options] [script path]

    Options:
    -h, --help               Display this help message
    -w, --workers            Manually specify worker count (default: CPU core count)
    -l, --log                Set the minimum log output level (default: debug)
    -f, --file               Log to file path instead of STDOUT (requires amphp/file)
                             Asterisks (*) in path will be replaced with cluster PID
    -n, --name               Cluster process name
    -p, --pid-file           File path to write current PID to
    -t, --shutdown-timeout   Shutdown timeout in seconds
    --                       Marker to separate cluster and worker arguments

    Example Usage:
    cluster --help
    cluster -w 4 /path/to/script.php
    cluster -l warning /another/script/path.php
    cluster server.php -- --port 8080


EOT;

error_reporting(E_ALL);

(static function (): void {
    $paths = [
        dirname(__DIR__, 3) . "/autoload.php",
        dirname(__DIR__) . "/vendor/autoload.php",
    ];

    foreach ($paths as $path) {
        if (file_exists($path)) {
            $autoloadPath = $path;
            break;
        }
    }

    if (!isset($autoloadPath)) {
        fwrite(STDERR, "Could not locate autoload.php");
        exit(1);
    }

    require $autoloadPath;
})();

$args = [
    "help" => [
        "prefix" => "h",
        "longPrefix" => "help",
        "description" => "Display the help screen",
        "noValue" => true,
    ],
    "log" => [
        "prefix" => "l",
        "longPrefix" => "log",
        "description" => "Set the minimum log output level",
        "defaultValue" => LogLevel::DEBUG,
    ],
    "file" => [
        "prefix" => "f",
        "longPrefix" => "file",
        "description" => "Log file path. Log messages are written to STDOUT if not specified.",
    ],
    "workers" => [
        "prefix" => "w",
        "longPrefix" => "workers",
        "description" => "Manually specify worker count",
        "castTo" => "int",
        "defaultValue" => countCpuCores(),
    ],
    "pid-file" => [
        "prefix" => "p",
        "longPrefix" => "pid-file",
        "description" => "File path to write current PID to",
    ],
    "shutdown-timeout" => [
        "prefix" => "t",
        "longPrefix" => "shutdown-timeout",
        "description" => "Shutdown timeout in seconds",
        "castTo" => "float",
        "defaultValue" => 1,
    ],
    "name" => [
        "prefix" => "n",
        "longPrefix" => "name",
        "description" => "Cluster process name",
        "defaultValue" => "amp-cluster",
    ],
    "script" => [
        "description" => "Defines the script to run",
    ],
];

$flags = [];

foreach ($args as $arg) {
    if (isset($arg['prefix'])) {
        $flags['-' . $arg['prefix']] = $arg['noValue'] ?? false;
    }

    if (isset($arg['longPrefix'])) {
        $flags['--' . $arg['longPrefix']] = $arg['noValue'] ?? false;
    }
}

$arguments = new CliArgumentManager();
$arguments->add($args);

try {
    $workerArguments = [];

    for ($i = 1; $i < $argc; $i++) {
        if ($argv[$i] === '--') {
            $workerArguments = array_slice($argv, $i + 1);
            $argv = array_slice($argv, 0, $i);
            break;
        }

        if (isset($flags[$argv[$i]])) {
            if (!$flags[$argv[$i]]) {
                $i++;
            }
        } elseif ($argv[0] === "-") {
            throw new Exception("Unknown command line option: {$argv[$i]}");
        }
    }

    $arguments->parse($argv);

    if ($arguments->defined("help", $argv)) {
        echo HELP;
        exit(0);
    }

    if ($arguments->get("script") === null) {
        throw new Exception("Script path is required");
    }
} catch (Exception $e) {
    echo "Invalid arguments: " . $e->getMessage() . PHP_EOL . PHP_EOL;
    echo HELP;
    exit(1);
}

// some command line SAPIs (e.g. phpdbg) don't have that function
if (function_exists("cli_set_process_title")) {
    @cli_set_process_title($arguments->get("name"));
}

if ($arguments->defined("pid-file")) {
    file_put_contents($arguments->get("pid-file"), getmypid());
    $pidFilePath = realpath($arguments->get("pid-file"));
    register_shutdown_function(static fn () => @unlink($pidFilePath));
}

$level = $arguments->get("log");

if ($arguments->defined("file", $argv)) {
    if (!interface_exists(File\File::class)) {
        throw new Exception("amphp/file must be installed to log to a file");
    }

    $path = str_replace('*', getmypid(), $arguments->get("file"));

    $logHandler = new StreamHandler(File\openFile($path, "a"), $level);
    $formatter = new LineFormatter;
} else {
    $logHandler = new StreamHandler(ByteStream\getStdout(), $level);
    $formatter = new ConsoleFormatter;
}

$formatter->allowInlineLineBreaks(true);
$logHandler->setFormatter($formatter);

$logger = new Logger("cluster-" . getmypid());
$logger->pushHandler($logHandler);

$workers = $arguments->get("workers");
$shutdownTimeout = $arguments->get("shutdown-timeout");
$script = $arguments->get("script");

if (!is_file($script)) {
    echo "Error: Could not find script at path " . $script . PHP_EOL . PHP_EOL;
    exit(1);
}

$logger->info("Starting cluster PID " . getmypid() . " with " . $workers . " workers");

if (isset($pidFilePath)) {
    $logger->info("Wrote PID file to " . $pidFilePath);
}

array_unshift($workerArguments, $arguments->get("script"));
$watcher = new ClusterWatcher($workerArguments, $logger);

try {
    $signalHandler = function (string $watcherId, int $signalNumber) use ($watcher, $logger, $shutdownTimeout): void {
        EventLoop::cancel($watcherId);

        $logger->info(sprintf('Stopping cluster due to received signal: %d', $signalNumber));
        $watcher->stop($shutdownTimeout > 0 ? new TimeoutCancellation($shutdownTimeout) : null);
    };

    foreach (Cluster::getSignalList() as $signo) {
        EventLoop::unreference(EventLoop::onSignal($signo, $signalHandler));
    }

    $signalHandler = function (string $watcherId, int $signalNumber) use ($watcher, $logger): void {
        EventLoop::disable($watcherId);

        $logger->info(sprintf('Restarting cluster due to received signal: %d', $signalNumber));
        $watcher->restart();

        EventLoop::enable($watcherId);
    };

    EventLoop::unreference(EventLoop::onSignal(defined('SIGUSR1') ? \SIGUSR1 : 10, $signalHandler));
} catch (EventLoop\UnsupportedFeatureException $e) {
    // ignore if extensions are missing or OS is Windows
}

$watcher->start($workers);

foreach ($watcher->getMessageIterator() as $message) {
    $data = $message->getData();
    $id = $message->getWorker()->getId();

    if (is_scalar($data) || $data instanceof \Stringable) {
        $logger->info(sprintf('Received message from worker %d: %s', $id, $data));
    } else {
        $logger->notice(sprintf('Received non-printable message from worker %d of type %s', $id, get_debug_type($data)));
    }
}
