source->isLocal() || $this->source->getWrapper() != 'plainfile') { throw new RuntimeException('The source data must be a local file stream when uploading in parallel.'); } if (empty($this->options['concurrency'])) { throw new RuntimeException('The `concurrency` option must be specified when instantiating.'); } } /** * {@inheritdoc} */ protected function transfer() { $totalParts = (int) ceil($this->source->getContentLength() / $this->partSize); $concurrency = min($totalParts, $this->options['concurrency']); $partsToSend = $this->prepareParts($concurrency); $eventData = $this->getEventData(); while (!$this->stopped && count($this->state) < $totalParts) { $currentTotal = count($this->state); $commands = array(); for ($i = 0; $i < $concurrency && $i + $currentTotal < $totalParts; $i++) { // Move the offset to the correct position $partsToSend[$i]->setOffset(($currentTotal + $i) * $this->partSize); // @codeCoverageIgnoreStart if ($partsToSend[$i]->getContentLength() == 0) { break; } // @codeCoverageIgnoreEnd $params = $this->state->getUploadId()->toParams(); $eventData['command'] = $this->client->getCommand('UploadPart', array_replace($params, array( 'PartNumber' => count($this->state) + 1 + $i, 'Body' => $partsToSend[$i], 'ContentMD5' => (bool) $this->options['part_md5'], Ua::OPTION => Ua::MULTIPART_UPLOAD ))); $commands[] = $eventData['command']; // Notify any listeners of the part upload $this->dispatch(self::BEFORE_PART_UPLOAD, $eventData); } // Allow listeners to stop the transfer if needed if ($this->stopped) { break; } // Execute each command, iterate over the results, and add to the transfer state /** @var \Guzzle\Service\Command\OperationCommand $command */ foreach ($this->client->execute($commands) as $command) { $this->state->addPart(UploadPart::fromArray(array( 'PartNumber' => $command['PartNumber'], 'ETag' => $command->getResponse()->getEtag(), 'Size' => (int) $command->getRequest()->getBody()->getContentLength(), 'LastModified' => gmdate(DateFormat::RFC2822) ))); $eventData['command'] = $command; // Notify any listeners the the part was uploaded $this->dispatch(self::AFTER_PART_UPLOAD, $eventData); } } } /** * Prepare the entity body handles to use while transferring * * @param int $concurrency Number of parts to prepare * * @return array Parts to send */ protected function prepareParts($concurrency) { $url = $this->source->getUri(); // Use the source EntityBody as the first part $parts = array(new ReadLimitEntityBody($this->source, $this->partSize)); // Open EntityBody handles for each part to upload in parallel for ($i = 1; $i < $concurrency; $i++) { $parts[] = new ReadLimitEntityBody(new EntityBody(fopen($url, 'r')), $this->partSize); } return $parts; } }