Source code for sceptre.plan.executor

# -*- coding: utf-8 -*-


This module implements a SceptrePlanExecutor, which is responsible for
executing the command specified in a SceptrePlan.
import logging

from concurrent.futures import ThreadPoolExecutor, as_completed
from sceptre.plan.actions import StackActions
from sceptre.stack_status import StackStatus

[docs]class SceptrePlanExecutor(object): def __init__(self, command, launch_order): """ Initalises a SceptrePlanExecutor, generates the launch order, threads and intial Stack Statuses. :param command: The command to execute on the Stack. :type command: str :param launch_order: A list containing sets of Stacks that can be\ executed concurrently. :type launch_order: list """ self.logger = logging.getLogger(__name__) self.command = command self.launch_order = launch_order self.num_threads = len(max(launch_order, key=len)) self.stack_statuses = {stack: StackStatus.PENDING for batch in launch_order for stack in batch}
[docs] def execute(self, *args): """ Execute is responsible executing the sets of Stacks in `launch_order` concurrently, in the correct order. :param *args: Any arguments that should be passed through to the\ StackAction being called. """ responses = {} with ThreadPoolExecutor(max_workers=self.num_threads) as executor: for batch in self.launch_order: futures = [executor.submit(self._execute, stack, *args) for stack in batch] for future in as_completed(futures): stack, status = future.result() responses[stack] = status return responses
def _execute(self, stack, *args): actions = StackActions(stack) result = getattr(actions, self.command)(*args) return stack, result