self::DEFAULT_BATCH_SIZE, 'callPeriod' => self::DEFAULT_CALL_PERIOD, 'bootstrapFile' => null, 'numWorkers' => self::DEFAULT_WORKERS]; $this->identifier = $identifier; $this->func = $func; $this->id = $idNum; $this->batchSize = $options['batchSize']; $this->callPeriod = $options['callPeriod']; $this->bootstrapFile = $options['bootstrapFile']; $this->numWorkers = $options['numWorkers']; $this->initFailureFile(); } /** * Run the job. */ public function run() { $this->setupSignalHandlers(); $sysvKey = $this->getSysvKey($this->id); $q = msg_get_queue($sysvKey); $items = []; $lastInvoked = microtime(true); if (!is_null($this->bootstrapFile)) { require_once $this->bootstrapFile; } while (true) { // Fire SIGALRM after 1 second to unblock the blocking call. pcntl_alarm(1); if (msg_receive( $q, 0, $type, 8192, $message, true, 0, // blocking mode $errorcode )) { if ($type === self::$typeDirect) { $items[] = $message; } elseif ($type === self::$typeFile) { $items[] = unserialize(file_get_contents($message)); @unlink($message); } } pcntl_signal_dispatch(); // It runs the job when // 1. Number of items reaches the batchSize. // 2-a. Count is >0 and the current time is larger than lastInvoked + period. // 2-b. Count is >0 and the shutdown flag is true. if (count($items) >= $this->batchSize || count($items) > 0 && (microtime(true) > $lastInvoked + $this->callPeriod || $this->shutdown)) { printf('Running the job with %d items' . PHP_EOL, count($items)); $this->flush($items); $items = []; $lastInvoked = microtime(true); } gc_collect_cycles(); if ($this->shutdown) { return; } } } /** * Finish any pending activity for this job. * * @param array $items * @return bool */ public function flush(array $items = []) { if (!$this->callFunc($items)) { $this->handleFailure($this->id, $items); return false; } return true; } /** * Finish any pending activity for this job. * * @access private * @internal * * @param array $items * @return bool */ public function callFunc(array $items = []) { return call_user_func_array($this->func, [$items]); } /** * Returns the period in seconds from the last execution to force * executing the job. * * @return float */ public function getCallPeriod() { return $this->callPeriod; } /** * Returns the batch size. * * @return int */ public function getBatchSize() { return $this->batchSize; } }