Source code for sceptre.plan.actions

# -*- coding: utf-8 -*-

"""
sceptre.plan.actions

This module implements the StackActions class which provides the functionality
available to a Stack.
"""

import logging
import time

from os import path
from datetime import datetime, timedelta

import botocore
import json
from dateutil.tz import tzutc

from sceptre.connection_manager import ConnectionManager
from sceptre.hooks import add_stack_hooks
from sceptre.stack_status import StackStatus
from sceptre.stack_status import StackChangeSetStatus

from sceptre.exceptions import CannotUpdateFailedStackError
from sceptre.exceptions import UnknownStackStatusError
from sceptre.exceptions import UnknownStackChangeSetStatusError
from sceptre.exceptions import StackDoesNotExistError
from sceptre.exceptions import ProtectedStackError


[docs]class StackActions(object): """ StackActions stores the operations a Stack can take, such as creating or deleting the Stack. :param stack: A Stack object :type stack: sceptre.stack.Stack """ def __init__(self, stack): self.stack = stack self.name = self.stack.name self.logger = logging.getLogger(__name__) self.connection_manager = ConnectionManager( self.stack.region, self.stack.profile, self.stack.external_name, self.stack.iam_role )
[docs] @add_stack_hooks def create(self): """ Creates a Stack. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Creating Stack", self.stack.name) create_stack_kwargs = { "StackName": self.stack.external_name, "Parameters": self._format_parameters(self.stack.parameters), "Capabilities": ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM', 'CAPABILITY_AUTO_EXPAND'], "NotificationARNs": self.stack.notifications, "Tags": [ {"Key": str(k), "Value": str(v)} for k, v in self.stack.tags.items() ] } if self.stack.on_failure: create_stack_kwargs.update({"OnFailure": self.stack.on_failure}) create_stack_kwargs.update( self.stack.template.get_boto_call_parameter()) create_stack_kwargs.update(self._get_role_arn()) create_stack_kwargs.update(self._get_stack_timeout()) try: response = self.connection_manager.call( service="cloudformation", command="create_stack", kwargs=create_stack_kwargs ) self.logger.debug( "%s - Create stack response: %s", self.stack.name, response ) status = self._wait_for_completion() except botocore.exceptions.ClientError as exp: if exp.response["Error"]["Code"] == "AlreadyExistsException": self.logger.info( "%s - Stack already exists", self.stack.name ) status = "COMPLETE" else: raise return status
[docs] @add_stack_hooks def update(self): """ Updates the Stack. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Updating Stack", self.stack.name) try: update_stack_kwargs = { "StackName": self.stack.external_name, "Parameters": self._format_parameters(self.stack.parameters), "Capabilities": [ 'CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM', 'CAPABILITY_AUTO_EXPAND' ], "NotificationARNs": self.stack.notifications, "Tags": [ {"Key": str(k), "Value": str(v)} for k, v in self.stack.tags.items() ] } update_stack_kwargs.update( self.stack.template.get_boto_call_parameter()) update_stack_kwargs.update(self._get_role_arn()) response = self.connection_manager.call( service="cloudformation", command="update_stack", kwargs=update_stack_kwargs ) status = self._wait_for_completion(self.stack.stack_timeout) self.logger.debug( "%s - Update Stack response: %s", self.stack.name, response ) # Cancel update after timeout if status == StackStatus.IN_PROGRESS: status = self.cancel_stack_update() return status except botocore.exceptions.ClientError as exp: error_message = exp.response["Error"]["Message"] if error_message == "No updates are to be performed.": self.logger.info( "%s - No updates to perform.", self.stack.name ) return StackStatus.COMPLETE else: raise
[docs] def cancel_stack_update(self): """ Cancels a Stack update. :returns: The cancelled Stack status. :rtype: sceptre.stack_status.StackStatus """ self.logger.warning( "%s - Update Stack time exceeded the specified timeout", self.stack.name ) response = self.connection_manager.call( service="cloudformation", command="cancel_update_stack", kwargs={"StackName": self.stack.external_name} ) self.logger.debug( "%s - Cancel update Stack response: %s", self.stack.name, response ) return self._wait_for_completion()
[docs] @add_stack_hooks def launch(self): """ Launches the Stack. If the Stack status is create_failed or rollback_complete, the Stack is deleted. Launch then tries to create or update the Stack, depending if it already exists. If there are no updates to be performed, launch exits gracefully. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Launching Stack", self.stack.name) try: existing_status = self._get_status() except StackDoesNotExistError: existing_status = "PENDING" self.logger.info( "%s - Stack is in the %s state", self.stack.name, existing_status ) if existing_status == "PENDING": status = self.create() elif existing_status in ["CREATE_FAILED", "ROLLBACK_COMPLETE"]: self.delete() status = self.create() elif existing_status.endswith("COMPLETE"): status = self.update() elif existing_status.endswith("IN_PROGRESS"): self.logger.info( "%s - Stack action is already in progress state and cannot " "be updated", self.stack.name ) status = StackStatus.IN_PROGRESS elif existing_status.endswith("FAILED"): status = StackStatus.FAILED raise CannotUpdateFailedStackError( "'{0}' is in a the state '{1}' and cannot be updated".format( self.stack.name, existing_status ) ) else: raise UnknownStackStatusError( "{0} is unknown".format(existing_status) ) return status
[docs] @add_stack_hooks def delete(self): """ Deletes the Stack. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ self._protect_execution() self.logger.info("%s - Deleting stack", self.stack.name) try: status = self._get_status() except StackDoesNotExistError: self.logger.info("%s - Does not exist.", self.stack.name) status = StackStatus.COMPLETE return status delete_stack_kwargs = {"StackName": self.stack.external_name} delete_stack_kwargs.update(self._get_role_arn()) self.connection_manager.call( service="cloudformation", command="delete_stack", kwargs=delete_stack_kwargs ) try: status = self._wait_for_completion() except StackDoesNotExistError: status = StackStatus.COMPLETE except botocore.exceptions.ClientError as error: if error.response["Error"]["Message"].endswith("does not exist"): status = StackStatus.COMPLETE else: raise self.logger.info("%s - delete %s", self.stack.name, status) return status
[docs] def lock(self): """ Locks the Stack by applying a deny-all updates Stack Policy. """ policy_path = path.join( # need to get to the base install path. __file__ will take us into # sceptre/actions so need to walk up the path. path.abspath(path.join(__file__, "..", "..")), "stack_policies/lock.json" ) self.set_policy(policy_path) self.logger.info("%s - Successfully locked Stack", self.stack.name)
[docs] def unlock(self): """ Unlocks the Stack by applying an allow-all updates Stack Policy. """ policy_path = path.join( # need to get to the base install path. __file__ will take us into # sceptre/actions so need to walk up the path. path.abspath(path.join(__file__, "..", "..")), "stack_policies/unlock.json" ) self.set_policy(policy_path) self.logger.info("%s - Successfully unlocked Stack", self.stack.name)
[docs] def describe(self): """ Returns the a description of the Stack. :returns: A Stack description. :rtype: dict """ try: return self.connection_manager.call( service="cloudformation", command="describe_stacks", kwargs={"StackName": self.stack.external_name} ) except botocore.exceptions.ClientError as e: if e.response["Error"]["Message"].endswith("does not exist"): return raise
[docs] def describe_events(self): """ Returns the CloudFormation events for a Stack. :returns: CloudFormation events for a Stack. :rtype: dict """ return self.connection_manager.call( service="cloudformation", command="describe_stack_events", kwargs={"StackName": self.stack.external_name} )
[docs] def describe_resources(self): """ Returns the logical and physical resource IDs of the Stack's resources. :returns: Information about the Stack's resources. :rtype: dict """ self.logger.debug("%s - Describing stack resources", self.stack.name) try: response = self.connection_manager.call( service="cloudformation", command="describe_stack_resources", kwargs={"StackName": self.stack.external_name} ) except botocore.exceptions.ClientError as e: if e.response["Error"]["Message"].endswith("does not exist"): return {self.stack.name: []} raise self.logger.debug( "%s - Describe Stack resource response: %s", self.stack.name, response ) desired_properties = ["LogicalResourceId", "PhysicalResourceId"] formatted_response = {self.stack.name: [ {k: v for k, v in item.items() if k in desired_properties} for item in response["StackResources"] ]} return formatted_response
[docs] def describe_outputs(self): """ Returns the Stack's outputs. :returns: The Stack's outputs. :rtype: list """ self.logger.debug("%s - Describing stack outputs", self.stack.name) try: response = self._describe() except botocore.exceptions.ClientError: return [] return {self.stack.name: response["Stacks"][0].get("Outputs", [])}
[docs] def continue_update_rollback(self): """ Rolls back a Stack in the UPDATE_ROLLBACK_FAILED state to UPDATE_ROLLBACK_COMPLETE. """ self.logger.debug("%s - Continuing update rollback", self.stack.name) continue_update_rollback_kwargs = { "StackName": self.stack.external_name } continue_update_rollback_kwargs.update(self._get_role_arn()) self.connection_manager.call( service="cloudformation", command="continue_update_rollback", kwargs=continue_update_rollback_kwargs ) self.logger.info( "%s - Successfully initiated continuation of update rollback", self.stack.name )
[docs] def set_policy(self, policy_path): """ Applies a Stack Policy. :param policy_path: The relative path of JSON file containing\ the AWS Policy to apply. :type policy_path: str """ with open(policy_path) as f: policy = f.read() self.logger.debug( "%s - Setting Stack policy: \n%s", self.stack.name, policy ) self.connection_manager.call( service="cloudformation", command="set_stack_policy", kwargs={ "StackName": self.stack.external_name, "StackPolicyBody": policy } ) self.logger.info("%s - Successfully set Stack Policy", self.stack.name)
[docs] def get_policy(self): """ Returns a Stack's Policy. :returns: The Stack's Stack Policy. :rtype: str """ self.logger.debug("%s - Getting Stack Policy", self.stack.name) response = self.connection_manager.call( service="cloudformation", command="get_stack_policy", kwargs={ "StackName": self.stack.external_name } ) json_formatting = json.loads(response.get( "StackPolicyBody", json.dumps("No Policy Information"))) return {self.stack.name: json_formatting}
[docs] @add_stack_hooks def create_change_set(self, change_set_name): """ Creates a Change Set with the name ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str """ create_change_set_kwargs = { "StackName": self.stack.external_name, "Parameters": self._format_parameters(self.stack.parameters), "Capabilities": ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM', 'CAPABILITY_AUTO_EXPAND'], "ChangeSetName": change_set_name, "NotificationARNs": self.stack.notifications, "Tags": [ {"Key": str(k), "Value": str(v)} for k, v in self.stack.tags.items() ] } create_change_set_kwargs.update( self.stack.template.get_boto_call_parameter() ) create_change_set_kwargs.update(self._get_role_arn()) self.logger.debug( "%s - Creating Change Set '%s'", self.stack.name, change_set_name ) self.connection_manager.call( service="cloudformation", command="create_change_set", kwargs=create_change_set_kwargs ) # After the call successfully completes, AWS CloudFormation # starts creating the Change Set. self.logger.info( "%s - Successfully initiated creation of Change Set '%s'", self.stack.name, change_set_name )
[docs] def delete_change_set(self, change_set_name): """ Deletes the Change Set ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str """ self.logger.debug( "%s - Deleting Change Set '%s'", self.stack.name, change_set_name ) self.connection_manager.call( service="cloudformation", command="delete_change_set", kwargs={ "ChangeSetName": change_set_name, "StackName": self.stack.external_name } ) # If the call successfully completes, AWS CloudFormation # successfully deleted the Change Set. self.logger.info( "%s - Successfully deleted Change Set '%s'", self.stack.name, change_set_name )
[docs] def describe_change_set(self, change_set_name): """ Describes the Change Set ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The description of the Change Set. :rtype: dict """ self.logger.debug( "%s - Describing Change Set '%s'", self.stack.name, change_set_name ) return self.connection_manager.call( service="cloudformation", command="describe_change_set", kwargs={ "ChangeSetName": change_set_name, "StackName": self.stack.external_name } )
[docs] def execute_change_set(self, change_set_name): """ Executes the Change Set ``change_set_name``. :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The Stack status :rtype: str """ self._protect_execution() change_set = self.describe_change_set(change_set_name) status = change_set.get("Status") reason = change_set.get("StatusReason") if status == "FAILED" and "submitted information didn't contain changes" in reason: self.logger.info( "Skipping ChangeSet on Stack: {} - there are no changes".format( change_set.get("StackName") ) ) return 0 self.logger.debug( "%s - Executing Change Set '%s'", self.stack.name, change_set_name ) self.connection_manager.call( service="cloudformation", command="execute_change_set", kwargs={ "ChangeSetName": change_set_name, "StackName": self.stack.external_name } ) status = self._wait_for_completion() return status
[docs] def list_change_sets(self): """ Lists the Stack's Change Sets. :returns: The Stack's Change Sets. :rtype: dict or list """ self.logger.debug("%s - Listing change sets", self.stack.name) try: response = self.connection_manager.call( service="cloudformation", command="list_change_sets", kwargs={ "StackName": self.stack.external_name } ) return {self.stack.name: response.get("Summaries", [])} except botocore.exceptions.ClientError: return []
[docs] def generate(self): """ Returns the Template for the Stack """ return self.stack.template.body
[docs] def validate(self): """ Validates the Stack's CloudFormation Template. Raises an error if the Template is invalid. :returns: Validation information about the Template. :rtype: dict :raises: botocore.exceptions.ClientError """ self.logger.debug("%s - Validating Template", self.stack.name) response = self.connection_manager.call( service="cloudformation", command="validate_template", kwargs=self.stack.template.get_boto_call_parameter() ) self.logger.debug( "%s - Validate Template response: %s", self.stack.name, response ) return response
[docs] def estimate_cost(self): """ Estimates a Stack's cost. :returns: An estimate of the Stack's cost. :rtype: dict :raises: botocore.exceptions.ClientError """ self.logger.debug("%s - Estimating template cost", self.stack.name) parameters = [ {'ParameterKey': key, 'ParameterValue': value} for key, value in self.stack.parameters.items() ] kwargs = self.stack.template.get_boto_call_parameter() kwargs.update({'Parameters': parameters}) response = self.connection_manager.call( service="cloudformation", command="estimate_template_cost", kwargs=kwargs ) self.logger.debug( "%s - Estimate Stack cost response: %s", self.stack.name, response ) return response
[docs] def get_status(self): """ Returns the Stack's status. :returns: The Stack's status. :rtype: sceptre.stack_status.StackStatus """ try: return self._get_status() except StackDoesNotExistError: return "PENDING"
def _format_parameters(self, parameters): """ Converts CloudFormation parameters to the format used by Boto3. :param parameters: A dictionary of parameters. :type parameters: dict :returns: A list of the formatted parameters. :rtype: list """ formatted_parameters = [] for name, value in parameters.items(): if value is None: continue if isinstance(value, list): value = ",".join(value) formatted_parameters.append({ "ParameterKey": name, "ParameterValue": value }) return formatted_parameters def _get_role_arn(self): """ Returns the Role ARN assumed by CloudFormation when building a Stack. Returns an empty dict if no Role is to be assumed. :returns: The a Role ARN :rtype: dict """ if self.stack.role_arn: return { "RoleARN": self.stack.role_arn } else: return {} def _get_stack_timeout(self): """ Return the timeout before considering the Stack to be failing. Returns an empty dict if no timeout is set. :returns: the creation/update timeout :rtype: dict """ if self.stack.stack_timeout: return { "TimeoutInMinutes": self.stack.stack_timeout } else: return {} def _protect_execution(self): """ Raises a ProtectedStackError if protect == True. :raises: sceptre.exceptions.ProtectedStackError """ if self.stack.protected: raise ProtectedStackError( "Cannot perform action on '{0}': Stack protection is " "currently enabled".format(self.stack.name) ) def _wait_for_completion(self, timeout=0): """ Waits for a Stack operation to finish. Prints CloudFormation events while it waits. :param timeout: Timeout before returning, in minutes. :returns: The final Stack status. :rtype: sceptre.stack_status.StackStatus """ timeout = 60 * timeout def timed_out(elapsed): return elapsed >= timeout if timeout else False status = StackStatus.IN_PROGRESS self.most_recent_event_datetime = ( datetime.now(tzutc()) - timedelta(seconds=3) ) elapsed = 0 while status == StackStatus.IN_PROGRESS and not timed_out(elapsed): status = self._get_simplified_status(self._get_status()) self._log_new_events() time.sleep(4) elapsed += 4 return status def _describe(self): return self.connection_manager.call( service="cloudformation", command="describe_stacks", kwargs={"StackName": self.stack.external_name} ) def _get_status(self): try: status = self._describe()["Stacks"][0]["StackStatus"] except botocore.exceptions.ClientError as exp: if exp.response["Error"]["Message"].endswith("does not exist"): raise StackDoesNotExistError(exp.response["Error"]["Message"]) else: raise exp return status @staticmethod def _get_simplified_status(status): """ Returns the simplified Stack Status. The simplified Stack status is represented by the struct ``sceptre.StackStatus()`` and can take one of the following options: * complete * in_progress * failed :param status: The CloudFormation Stack status to simplify. :type status: str :returns: The Stack's simplified status :rtype: sceptre.stack_status.StackStatus """ if status.endswith("ROLLBACK_COMPLETE"): return StackStatus.FAILED elif status.endswith("_COMPLETE"): return StackStatus.COMPLETE elif status.endswith("_IN_PROGRESS"): return StackStatus.IN_PROGRESS elif status.endswith("_FAILED"): return StackStatus.FAILED else: raise UnknownStackStatusError( "{0} is unknown".format(status) ) def _log_new_events(self): """ Log the latest Stack events while the Stack is being built. """ events = self.describe_events()["StackEvents"] events.reverse() new_events = [ event for event in events if event["Timestamp"] > self.most_recent_event_datetime ] for event in new_events: self.logger.info(" ".join([ self.stack.name, event["LogicalResourceId"], event["ResourceType"], event["ResourceStatus"], event.get("ResourceStatusReason", "") ])) self.most_recent_event_datetime = event["Timestamp"]
[docs] def wait_for_cs_completion(self, change_set_name): """ Waits while the Stack Change Set status is "pending". :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The Change Set's status. :rtype: sceptre.stack_status.StackChangeSetStatus """ while True: status = self._get_cs_status(change_set_name) if status != StackChangeSetStatus.PENDING: break time.sleep(2) return status
def _get_cs_status(self, change_set_name): """ Returns the status of a Change Set. :param change_set_name: The name of the Change Set. :type change_set_name: str :returns: The Change Set's status. :rtype: sceptre.stack_status.StackChangeSetStatus """ cs_description = self.describe_change_set(change_set_name) cs_status = cs_description["Status"] cs_exec_status = cs_description["ExecutionStatus"] possible_statuses = [ "CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE", "DELETE_COMPLETE", "FAILED" ] possible_execution_statuses = [ "UNAVAILABLE", "AVAILABLE", "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE" ] if cs_status not in possible_statuses: raise UnknownStackChangeSetStatusError( "Status {0} is unknown".format(cs_status) ) if cs_exec_status not in possible_execution_statuses: raise UnknownStackChangeSetStatusError( "ExecutionStatus {0} is unknown".format(cs_status) ) if ( cs_status == "CREATE_COMPLETE" and cs_exec_status == "AVAILABLE" ): return StackChangeSetStatus.READY elif ( cs_status in [ "CREATE_PENDING", "CREATE_IN_PROGRESS", "CREATE_COMPLETE" ] and cs_exec_status in ["UNAVAILABLE", "AVAILABLE"] ): return StackChangeSetStatus.PENDING elif ( cs_status in ["DELETE_COMPLETE", "FAILED"] or cs_exec_status in [ "EXECUTE_IN_PROGRESS", "EXECUTE_COMPLETE", "EXECUTE_FAILED", "OBSOLETE" ] ): return StackChangeSetStatus.DEFUNCT else: # pragma: no cover raise Exception("This else should not be reachable.")