client = $client; } /** * {@inheritdoc} */ public function transfer(array $batch) { // Create a container exception for any unprocessed items $unprocessed = new UnprocessedWriteRequestsException(); // Execute the transfer logic $this->performTransfer($batch, $unprocessed); // Throw an exception containing the unprocessed items if there are any if (count($unprocessed)) { throw $unprocessed; } } /** * Transfer a batch of requests and collect any unprocessed items * * @param array $batch A batch of write requests * @param UnprocessedWriteRequestsException $unprocessedRequests Collection of unprocessed items * * @throws \Guzzle\Common\Exception\ExceptionCollection */ protected function performTransfer( array $batch, UnprocessedWriteRequestsException $unprocessedRequests ) { // Do nothing if the batch is empty if (empty($batch)) { return; } // Prepare an array of commands to be sent in parallel from the batch $commands = $this->prepareCommandsForBatchedItems($batch); // Execute the commands and handle exceptions try { $commands = $this->client->execute($commands); $this->getUnprocessedRequestsFromCommands($commands, $unprocessedRequests); } catch (ExceptionCollection $exceptions) { // Create a container exception for any unhandled (true) exceptions $unhandledExceptions = new ExceptionCollection(); // Loop through caught exceptions and handle RequestTooLarge scenarios /** @var DynamoDbException $e */ foreach ($exceptions as $e) { if ($e instanceof DynamoDbException) { $request = $e->getRequest(); if ($e->getStatusCode() === 413) { $this->retryLargeRequest($request, $unprocessedRequests); } elseif ($e->getExceptionCode() === 'ProvisionedThroughputExceededException') { $this->handleUnprocessedRequestsAfterException($request, $unprocessedRequests); } else { $unhandledExceptions->add($e); } } else { $unhandledExceptions->add($e); } } // If there were unhandled exceptions, throw them if (count($unhandledExceptions)) { throw $unhandledExceptions; } } } /** * Prepares an array of BatchWriteItem command objects for a given batch of items * * @param array $batch A batch of write requests * * @return array */ protected function prepareCommandsForBatchedItems(array $batch) { $commands = array(); foreach (array_chunk($batch, self::BATCH_WRITE_MAX_SIZE) as $chunk) { // Convert the request items into the format required by the client $items = array(); foreach ($chunk as $item) { if ($item instanceof AbstractWriteRequest) { /** @var AbstractWriteRequest $item */ $table = $item->getTableName(); if (!isset($items[$table])) { $items[$table] = array(); } $items[$table][] = $item->toArray(); } } // Create the BatchWriteItem request $commands[] = $this->client->getCommand('BatchWriteItem', array( 'RequestItems' => $items, Ua::OPTION => Ua::BATCH )); } return $commands; } /** * Handles unprocessed items from the executed commands. Unprocessed items * can be collected and thrown in an UnprocessedWriteRequestsException * * @param array $commands Array of commands * @param UnprocessedWriteRequestsException $unprocessedRequests Collection of unprocessed items */ protected function getUnprocessedRequestsFromCommands( array $commands, UnprocessedWriteRequestsException $unprocessedRequests ) { /** @var CommandInterface $command */ foreach ($commands as $command) { if ($command instanceof CommandInterface && $command->isExecuted()) { $result = $command->getResult(); $items = $this->convertResultsToUnprocessedRequests($result['UnprocessedItems']); foreach ($items as $request) { $unprocessedRequests->addItem($request); } } } } /** * Handles exceptions caused by the request being too large (over 1 MB). The * response will have a status code of 413. In this case the batch should be * split up into smaller batches and retried. * * @param EntityEnclosingRequestInterface $request The failed request * @param UnprocessedWriteRequestsException $unprocessedRequests Collection of unprocessed items */ protected function retryLargeRequest( EntityEnclosingRequestInterface $request, UnprocessedWriteRequestsException $unprocessedRequests ) { // Collect the items out from the request object $items = $this->extractItemsFromRequestObject($request); // Divide batch into smaller batches and transfer them via recursion // NOTE: Dividing the batch into 3 (instead of 2) batches resulted in less recursion during testing if ($items) { $newBatches = array_chunk($items, ceil(count($items) / 3)); foreach ($newBatches as $newBatch) { $this->performTransfer($newBatch, $unprocessedRequests); } } } /** * Handles unprocessed items if the entire batch was rejected due to exceeding the provisioned throughput * * @param EntityEnclosingRequestInterface $request The failed request * @param UnprocessedWriteRequestsException $unprocessedRequests Collection of unprocessed items */ protected function handleUnprocessedRequestsAfterException( EntityEnclosingRequestInterface $request, UnprocessedWriteRequestsException $unprocessedRequests ) { $items = $this->extractItemsFromRequestObject($request); foreach ($items as $request) { $unprocessedRequests->addItem($request); } } /** * Collects and creates unprocessed request objects from data collected from erroneous cases * * @param array $items Data formatted under "RequestItems" or "UnprocessedItems" keys * * @return array */ protected function convertResultsToUnprocessedRequests(array $items) { $unprocessed = array(); foreach ($items as $table => $requests) { foreach ($requests as $request) { $unprocessed[] = new UnprocessedRequest($request, $table); } } return $unprocessed; } /** * Helper method to extract the items from a request object for a BatchWriteItem operation * * @param EntityEnclosingRequestInterface $request * * @return array */ private function extractItemsFromRequestObject(EntityEnclosingRequestInterface $request) { $items = json_decode((string) $request->getBody(), true); return $this->convertResultsToUnprocessedRequests($items['RequestItems'] ?: array()); } }