isSysvIPCLoaded() && $this->isDaemonRunning()) { $configStorage = new \DeliciousBrains\WP_Offload_Media\Gcp\Google\Cloud\Core\Batch\SysvConfigStorage(); $processor = new \DeliciousBrains\WP_Offload_Media\Gcp\Google\Cloud\Core\Batch\SysvProcessor(); } else { $configStorage = \DeliciousBrains\WP_Offload_Media\Gcp\Google\Cloud\Core\Batch\InMemoryConfigStorage::getInstance(); $processor = $configStorage; } } $this->configStorage = $configStorage; $this->processor = $processor; $this->loadConfig(); } /** * Register a job for batch execution. * * @param string $identifier Unique identifier of the job. * @param callable $func Any Callable except for Closure. The callable * should accept an array of items as the first argument. * @param array $options [optional] { * Configuration options. * * @type int $batchSize The size of the batch. * @type float $callPeriod The period in seconds from the last execution * to force executing the job. * @type int $numWorkers The number of child processes. It only takes * effect with the {@see \Google\Cloud\Core\Batch\BatchDaemon}. * @type string $bootstrapFile A file to load before executing the * job. It's needed for registering global functions. * } * @return bool true on success, false on failure * @throws \InvalidArgumentException When receiving a Closure. */ public function registerJob($identifier, $func, array $options = []) { if ($func instanceof \Closure) { throw new \InvalidArgumentException('Closure is not allowed'); } // Always work on the latest data $result = $this->configStorage->lock(); if ($result === false) { return false; } $this->config = $this->configStorage->load(); $this->config->registerJob($identifier, function ($id) use($identifier, $func, $options) { return new \DeliciousBrains\WP_Offload_Media\Gcp\Google\Cloud\Core\Batch\BatchJob($identifier, $func, $id, $options); }); try { $result = $this->configStorage->save($this->config); } finally { $this->configStorage->unlock(); } return $result; } /** * Submit an item. * * @param string $identifier Unique identifier of the job. * @param mixed $item It needs to be serializable. * * @return bool true on success, false on failure * @throws \RuntimeException */ public function submitItem($identifier, $item) { $job = $this->getJobFromId($identifier); if ($job === null) { throw new \RuntimeException("The identifier does not exist: {$identifier}"); } $idNum = $job->id(); return $this->processor->submit($item, $idNum); } /** * Get the job with the given identifier. * * @param string $identifier Unique identifier of the job. * * @return BatchJob|null */ public function getJobFromId($identifier) { return $this->config->getJobFromId($identifier); } /** * Get the job with the given numeric id. * * @param int $idNum A numeric id of the job. * * @return BatchJob|null */ public function getJobFromIdNum($idNum) { return $this->config->getJobFromIdNum($idNum); } /** * Get all the jobs. * * @return BatchJob[] */ public function getJobs() { return $this->config->getJobs(); } /** * Load the config from the storage. * * @return bool true on success * @throws \RuntimeException when it fails to load the config. */ public function loadConfig() { $result = $this->configStorage->lock(); if ($result === false) { throw new \RuntimeException('Failed to lock the configStorage'); } try { $result = $this->configStorage->load(); } catch (\RuntimeException $e) { $this->configStorage->clear(); throw $e; } finally { $this->configStorage->unlock(); } $this->config = $result; return true; } /** * Gets the item processor. * * @return ProcessItemInterface */ public function getProcessor() { return $this->processor; } }