createBatchesWith(new BatchSizeDivisor($batchSize)) ->transferWith(new WriteRequestBatchTransfer($client)); if ($notify) { $builder->notify($notify); } $batch = new self($builder->build()); $batch = new FlushingBatch($batch, $batchSize); return $batch; } /** * {@inheritdoc} */ public function add($item) { if ($item instanceof AbstractCommand) { // Convert PutItem and DeleteItem into the correct format $name = $item->getName(); if (in_array($name, array('PutItem', 'DeleteItem'))) { $class = __NAMESPACE__ . '\\' . str_replace('Item', 'Request', $name); $item = $class::fromCommand($item); } else { throw new InvalidArgumentException('The command provided was not a PutItem or DeleteItem command.'); } } if (!($item instanceof WriteRequestInterface)) { throw new InvalidArgumentException('The item you are trying to add to the batch queue is invalid.'); } return $this->decoratedBatch->add($item); } /** * {@inheritdoc} */ public function flush() { // Flush the queue $items = array(); while (!$this->decoratedBatch->isEmpty()) { try { $items = array_merge($items, $this->decoratedBatch->flush()); } catch (BatchTransferException $e) { $unprocessed = $e->getPrevious(); if ($unprocessed instanceof UnprocessedWriteRequestsException) { // Handles the UnprocessedItemsException that may occur for // throttled items the batch. These are re-queued here foreach ($unprocessed as $unprocessedItem) { $this->add($unprocessedItem); } } else { // Re-throw the exception if not handled throw $e; } } } return $items; } }