Automating AWS Cost Optimization: A Deep Dive into Unused Resource Detection
Written by:
Igor Gorovyy
DevOps Engineer Lead & Senior Solutions Architect
Objective
Cloud-native applications tend to grow rapidly β and so do their costs. One major contributor to unnecessary AWS spend is abandoned or underutilized resources: EC2 instances left running, unattached volumes, idle Lambda functions, and more.
This article walks through an advanced Python-based tool built to identify unused AWS resources, improve cost visibility, and automate FinOps practices.
This article presents an approach to building a custom solution for identifying and inventorying unused (abandoned) resources in your AWS account to achieve cost optimization.
π§° The article provides both a conceptual approach and partial implementation of the scanner, written in Python using
boto3
,CloudWatch
,STS
,colorama
, and structuredJSON
logging.
β οΈ Clarifications or Partial Accuracy
Statement | Comment |
---|---|
"An S3 bucket can be considered unused if there are no activity metrics and it is empty." | β οΈ This is a general heuristic, but not a 100% guarantee β CloudWatch Metrics might be limited unless S3 Storage Class Analysis or advanced logging is enabled. |
"A KMS Key is unused if CloudWatch shows zero usage." | β οΈ Use caution: not all key usage is logged as metrics. For example, automatic encryption in S3 or EBS may not appear in CloudWatch metrics. |
"Default VPC can be safely deleted." | β οΈ Only true if you're absolutely sure no resources are using it β some AWS services use the default VPC automatically. |
"Dedicated Hosts are considered unused if there are no running instances." | β οΈ Correct, but sometimes they are reserved ahead of time for specific workloads β context is important. |
If you're interested in the full implementation, feel free to leave a comment β I'll gladly share the repository.
Why a Custom Scanner?
While AWS provides several built-in tools like Trusted Advisor, Compute Optimizer, and Cost Explorer, they often:
- Require a premium support plan
- Are limited in scope (e.g., only EC2, EBS, or CPU-related metrics)
- Do not offer automation, JSON reports, or Slack/email alerts
This custom scanner fills those gaps.
Key Features
β
Multi-region scanning per service
β
Service support: EC2, RDS, Lambda, S3, IAM, ACM, DynamoDB, VPC, EBS, ELB, ECR, and more
β
CloudWatch metrics evaluation (e.g., CPU, connections, invocations)
β
JSON output report with timestamp and account ID
β
Execution time tracking per function
β
Slack + SES email notification support
β
S3 export for FinOps dashboards
β
EventBridge-compatible for scheduling
β
Language support: English πΊπΈ + Ukrainian πΊπ¦
How It Works
Diagram
flowchart TD
%% Main Flow
A[β° EventBridge Schedule<br>Weekly/Monthly] --> B[π AWS Lambda / ECS Task]
B --> C[π§ FinOps Scanner Script]
%% Resource Scanning
subgraph "Resource Scanning Phase"
C --> D1[π Scan AWS Services]
C --> D2[π CloudWatch Metrics]
C --> D3[π§Ύ Resource Analysis]
D1 --> D3
D2 --> D3
end
%% Analysis & Reporting
subgraph "Analysis & Reporting Phase"
D3 --> E1[πΎ JSON Report to S3]
D3 --> E2[π Slack Alerts]
D3 --> E3[π§ Email via SES]
end
%% Data Pipeline
subgraph "Data Pipeline"
E1 --> F1[S3 Bucket]
F1 --> F2[Glue Crawler]
F2 --> F3[Athena]
F3 --> F4[Visualization]
subgraph "Visualization Options"
F4 --> G1[QuickSight]
F4 --> G2[Grafana]
end
end
%% Styling
classDef schedule fill:#f8f9fa,stroke:#6c757d,stroke-width:2px
classDef compute fill:#e9ecef,stroke:#495057,stroke-width:2px
classDef scanner fill:#dee2e6,stroke:#343a40,stroke-width:2px
classDef scan fill:#ced4da,stroke:#212529,stroke-width:2px
classDef report fill:#adb5bd,stroke:#1a1e21,stroke-width:2px
classDef data fill:#6c757d,stroke:#f8f9fa,stroke-width:2px
classDef viz fill:#495057,stroke:#e9ecef,stroke-width:2px
class A schedule
class B compute
class C scanner
class D1,D2,D3 scan
class E1,E2,E3 report
class F1,F2,F3,F4 data
class G1,G2 viz
1. Startup
- Detect AWS account ID
- Configure logging with timestamped files
- Parse language from environment variable (default: UA)
2. Execution Timing
Each function is wrapped in a decorator to measure and log runtime.
3. Per-service Resource Analysis
Each major AWS service has a dedicated find_*()
function:
Service | What It Checks |
---|---|
EC2 | Running instances with CPU usage < threshold |
RDS | Low CPU or DB connections |
Lambda | No invocations in last 7 days |
ElastiCache | Clusters with CPU < 5% |
EBS Volumes | Unattached (status: available) |
Snapshots | Older than 30 days (EC2, RDS, ElastiCache) |
S3 | Empty buckets with no access in 90 days |
IAM | Roles, users, policies, and keys not used |
ACM/KMS | Unused certificates and keys |
DynamoDB | No read capacity used |
ECR | No image push in last 90 days |
ALBs / TGs | No request traffic via CloudWatch |
VPC | Unused subnets, SGs, network interfaces |
Output
At the end of a scan, the script:
- Saves the full results to
results/aws_waste_results_<account_id>_<timestamp>.json
- Includes execution time per function
- Logs a summary (number of items found, time taken)
{
"ec2_instances": [],
"rds_instances": [],
"eks_clusters": [],
"lambda_functions": [
{
"FunctionName": "Demo-LambdaFunction-ASHKskIYvVLJ",
"Region": "eu-central-1"
}
],
"elasticache_clusters": [],
"ec2_snapshots": [],
"rds_snapshots": [],
"elasticache_snapshots": [],
"kms_keys": [
{
"KeyId": "mrk-8d3588c3caf64d86b82c30994703057a",
"Region": "eu-central-1",
"Description": "CMK Infra S3 tfstate"
}
],
"s3_buckets": [],
"dynamodb_tables": [],
"iam_roles": [
{
"RoleName": "AWSServiceRoleForCloudTrail",
"LastUsed": "2024-11-26"
},
{
"RoleName": "AWSServiceRoleForComputeOptimizer",
"LastUsed": "Never"
},
{
"RoleName": "AWSServiceRoleForKeyManagementServiceMultiRegionKeys",
"LastUsed": "2024-12-11"
},
{
"RoleName": "AWSServiceRoleForSupport",
"LastUsed": "Never"
},
{
"RoleName": "AWSServiceRoleForTrustedAdvisor",
"LastUsed": "Never"
},
{
"RoleName": " Demo-CrossAccountRole-rwI5LmDqcHJR",
"LastUsed": "2024-12-04"
},
{
"RoleName": " Demo-LambdaExecutionRole-v4VLtulWfCW2",
"LastUsed": "2024-11-25"
}
],
"iam_policies": [],
"ebs_volumes": [],
"amis": [],
"alb": [],
"target_groups": [],
"acm_certificates": [],
"vpc_resources": {
"vpc": [],
"subnets": [],
"security_groups": [],
"network_interfaces": []
},
"ec2_resources": {
"elastic_ips": [],
"placement_groups": [],
"dedicated_hosts": []
},
"iam_resources": {
"users": [
{
"UserName": "provisioner",
"LastUsed": "Never"
}
],
"groups": [],
"access_keys": []
},
"ecr_repositories": [],
"execution_times": {
"find_idle_ec2_instances": 26.1112859249115,
"find_unused_rds_instances": 14.077875852584839,
"find_idle_eks_clusters": 5.355216979980469,
"find_unused_lambda_functions": 21.097691774368286,
"find_unused_elasticache_clusters": 13.915351152420044,
"find_unused_ec2_snapshots": 27.415552139282227,
"find_unused_rds_snapshots": 14.19884705543518,
"find_unused_elasticache_snapshots": 12.656662940979004,
"find_unused_kms_keys": 31.967848300933838,
"find_unused_s3_buckets": 0.30380916595458984,
"find_unused_dynamodb_tables": 44.07150626182556,
"find_unused_iam_roles": 2.6107289791107178,
"find_unused_iam_policies": 0.53951096534729,
"find_unused_ebs_volumes": 27.511493682861328,
"find_unused_amis": 25.320037841796875,
"find_unused_alb": 25.871586084365845,
"find_unused_target_groups": 24.60790991783142,
"find_unused_acm_certificates": 20.009007215499878,
"find_unused_vpc_resources": 41.87678790092468,
"find_unused_ec2_resources": 32.71801400184631,
"find_unused_iam_resources": 1.2918949127197266,
"find_unused_ecr_repositories": 13.991612195968628
}
}
FinOps Dashboard Integration
You can easily feed this data into a dashboard:
- Upload to S3
- Use Glue Crawler to define schema
- Query with Athena
- Build reports in QuickSight or Grafana
Examples:
- π Top 10 idle EC2 by cost
- πΊοΈ Heatmap of resource sprawl per region
- π Usage trend over time
IAM Permissions Required
The scanner requires read-only + CloudWatch + S3 + SES permissions. A sample IAM policy includes:
ec2:Describe*
,rds:Describe*
,lambda:ListFunctions
cloudwatch:GetMetricStatistics
s3:PutObject
,ses:SendEmail
logs:*
(for Lambda logging)
This is the full IAM policy :
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadResourceMetadata",
"Effect": "Allow",
"Action": [
"ec2:Describe*",
"rds:Describe*",
"eks:Describe*",
"eks:List*",
"lambda:ListFunctions",
"lambda:GetFunction",
"elasticache:Describe*",
"dynamodb:ListTables",
"dynamodb:DescribeTable",
"acm:ListCertificates",
"acm:DescribeCertificate",
"kms:ListKeys",
"kms:DescribeKey",
"s3:ListAllMyBuckets",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:GetBucketTagging",
"s3:GetBucketAcl",
"ecr:DescribeRepositories",
"ecr:DescribeImages",
"iam:ListRoles",
"iam:GetRole",
"iam:ListPolicies",
"iam:ListEntitiesForPolicy",
"iam:ListUsers",
"iam:GetUser",
"iam:ListAccessKeys",
"iam:GetAccessKeyLastUsed",
"iam:ListGroups",
"iam:GetGroup"
],
"Resource": "*"
},
{
"Sid": "CloudWatchMetricsAccess",
"Effect": "Allow",
"Action": [
"cloudwatch:GetMetricStatistics",
"cloudwatch:ListMetrics"
],
"Resource": "*"
},
{
"Sid": "AllowSESSendEmail",
"Effect": "Allow",
"Action": [
"ses:SendEmail",
"ses:SendRawEmail"
],
"Resource": "*"
},
{
"Sid": "AllowS3UploadResults",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::your-bucket-name/*"
},
{
"Sid": "AllowLogAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
},
{
"Sid": "AllowSTSID",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
}
]
}
Feature Comparison
Feature / Tool | Trusted Advisor | Compute Optimizer | Cost Explorer | Resource Explorer | This Script |
---|---|---|---|---|---|
EC2, RDS, Lambda scan | β (partial) | β (CPU only) | β οΈ (by trend) | β | β |
Unused S3 detection | β | β | β | β | β |
IAM usage visibility | β | β | β | β | β |
ACM/KMS scan | β | β | β | β | β |
Snapshot cleanup | β | β | β | β | β |
Slack/Email notifications | β | β | β | β | β |
JSON reports | β | β | β | β | β |
Automation & scheduling | β | β | β | β | β EventBridge |
Dashboard-ready | β οΈ | β | β | β οΈ | β Athena/S3 |
Premium support needed | β Yes | β | β | β | β |
Extendable (custom logic) | β | β | β | β | β |
Deployment Options
- Run as Lambda (use included Dockerfile or zip package)
- Schedule via EventBridge
- Use Terraform for IAM setup
- Customize thresholds (CPU%, days, etc.)
Best Practices
- Use resource tagging:
Owner
,Purpose
,TTL
- Review flagged resources before deletion
- Schedule weekly/monthly scans
- Visualize trends using S3+Athena dashboards
- Combine with native tools for full FinOps coverage
Bonus: Add-on Tools
- Slack bot alerting for urgent findings
- SES email digests
- GitHub Actions CI/CD for Lambda deployment
- Terraform module for multi-account integration
Conclusion
This script goes beyond what AWS native tools offer, empowering cloud teams to:
- Improve cost visibility
- Take action on idle resources
- Integrate FinOps with engineering workflows
- Automate everything
Additional:
Code of scaner
import os
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
from datetime import datetime, timedelta, timezone
import logging
from colorama import init, Fore, Style
import sys
import json
import time
from functools import wraps
# Global variables
RUN_COUNTER = 0
LANG = os.getenv('LANG', 'UA').upper().split('.')[0] # Extract language code before dot
if LANG not in ['EN', 'UA']: # If language is not supported, default to UA
LANG = 'EN'
# Execution time statistics
execution_times = {}
def measure_time(func):
"""Decorator to measure execution time of functions"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
# Store execution time
execution_times[func.__name__] = execution_time
# Log execution time
logging.info(f"{Fore.CYAN}Function {func.__name__} executed in {execution_time:.2f} seconds{Style.RESET_ALL}")
return result
return wrapper
# Language-specific messages
MESSAGES = {
'EN': {
'log_saved': "Log saved to file: {}",
'logging_error': "Error setting up logging: {}",
'account_id_error': "Error getting account ID: {}",
'region_start': "Starting check in region {} for {}",
'resource_found': "Found unused {}: {} in region {}",
'error_occurred': "Error occurred in region {}: {}",
'check_completed': "Check completed in region {}",
'run_start': "Starting check run #{}",
'run_end': "Check run #{} completed",
'results_saved': "Results saved to file: {}",
'no_instances': "No running instances found in region: {}"
},
'UA': {
'log_saved': "ΠΠΎΠ³ Π·Π±Π΅ΡΠ΅ΠΆΠ΅Π½ΠΎ Ρ ΡΠ°ΠΉΠ»: {}",
'logging_error': "ΠΠΎΠΌΠΈΠ»ΠΊΠ° Π½Π°Π»Π°ΡΡΡΠ²Π°Π½Π½Ρ Π»ΠΎΠ³ΡΠ²Π°Π½Π½Ρ: {}",
'account_id_error': "ΠΠΎΠΌΠΈΠ»ΠΊΠ° ΠΎΡΡΠΈΠΌΠ°Π½Π½Ρ ID Π°ΠΊΠ°ΡΠ½ΡΡ: {}",
'region_start': "ΠΠΎΡΠ°ΡΠΎΠΊ ΠΏΠ΅ΡΠ΅Π²ΡΡΠΊΠΈ Π² ΡΠ΅Π³ΡΠΎΠ½Ρ {} Π΄Π»Ρ {}",
'resource_found': "ΠΠ½Π°ΠΉΠ΄Π΅Π½ΠΎ Π½Π΅Π²ΠΈΠΊΠΎΡΠΈΡΡΠ°Π½ΠΈΠΉ {}: {} Π² ΡΠ΅Π³ΡΠΎΠ½Ρ {}",
'error_occurred': "ΠΠΎΠΌΠΈΠ»ΠΊΠ° Π² ΡΠ΅Π³ΡΠΎΠ½Ρ {}: {}",
'check_completed': "ΠΠ΅ΡΠ΅Π²ΡΡΠΊΠ° Π·Π°Π²Π΅ΡΡΠ΅Π½Π° Π² ΡΠ΅Π³ΡΠΎΠ½Ρ {}",
'run_start': "ΠΠΎΡΠ°ΡΠΎΠΊ ΠΏΠ΅ΡΠ΅Π²ΡΡΠΊΠΈ #{}",
'run_end': "ΠΠ΅ΡΠ΅Π²ΡΡΠΊΠ° #{} Π·Π°Π²Π΅ΡΡΠ΅Π½Π°",
'results_saved': "Π Π΅Π·ΡΠ»ΡΡΠ°ΡΠΈ Π·Π±Π΅ΡΠ΅ΠΆΠ΅Π½ΠΎ Ρ ΡΠ°ΠΉΠ»: {}",
'no_instances': "ΠΠ΅ Π·Π½Π°ΠΉΠ΄Π΅Π½ΠΎ Π·Π°ΠΏΡΡΠ΅Π½ΠΈΡ
ΡΠ½ΡΡΠ°Π½ΡΡΠ² Π² ΡΠ΅Π³ΡΠΎΠ½Ρ: {}"
}
}
def get_message(key):
"""Get localized message based on LANG setting"""
return MESSAGES[LANG][key]
# Initialize colorama
init()
def get_aws_account_id():
"""Get AWS account ID"""
try:
sts = boto3.client('sts')
return sts.get_caller_identity()['Account']
except Exception as e:
log_error(get_message('account_id_error').format(str(e)))
return "unknown"
def setup_logging():
"""Configure logging with dynamic filename"""
try:
account_id = get_aws_account_id()
current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
log_filename = f"aws_waste_finder_{account_id}_{current_time}.log"
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Save file in logs directory
log_filepath = os.path.join('logs', log_filename)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filepath),
logging.StreamHandler(sys.stdout)
]
)
logging.info(f"{Fore.MAGENTA}{get_message('log_saved').format(log_filepath)}{Style.RESET_ALL}")
except Exception as e:
print(get_message('logging_error').format(str(e)))
# If dynamic filename setup fails, use standard logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('aws_waste_finder.log'),
logging.StreamHandler(sys.stdout)
]
)
# Configure logging
setup_logging()
def log_region_start(region, service):
"""Log start of region check"""
logging.info(f"{Fore.BLUE}{get_message('region_start').format(region, service)}{Style.RESET_ALL}")
def log_unused_resource(resource_type, resource_id, region, details=""):
"""Log found unused resource"""
message = get_message('resource_found').format(resource_type, resource_id, region)
if details:
message += f" ({details})"
logging.info(f"{Fore.YELLOW}{message}{Style.RESET_ALL}")
def log_error(message, region="global"):
"""Log error message"""
if region != "global":
message = get_message('error_occurred').format(region, message)
logging.error(f"{Fore.RED}{message}{Style.RESET_ALL}")
def log_region_end(region):
"""Log end of region check"""
logging.info(f"{Fore.BLUE}{get_message('check_completed').format(region)}{Style.RESET_ALL}")
def log_run_start():
"""Log start of check run"""
logging.info(f"{Fore.MAGENTA}{get_message('run_start').format(RUN_COUNTER)}{Style.RESET_ALL}")
def log_run_end():
"""Log end of check run"""
logging.info(f"{Fore.MAGENTA}{get_message('run_end').format(RUN_COUNTER)}{Style.RESET_ALL}")
def save_results_to_file(results):
"""Save results to JSON file"""
try:
account_id = get_aws_account_id()
current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
# Create results directory if it doesn't exist
os.makedirs('results', exist_ok=True)
# Save file in results directory
filename = f"results/aws_waste_results_{account_id}_{current_time}.json"
# Add execution times to results
results['execution_times'] = execution_times
with open(filename, 'w') as f:
json.dump(results, f, indent=4, default=str)
logging.info(f"{Fore.GREEN}{get_message('results_saved').format(filename)}{Style.RESET_ALL}")
# Log execution time statistics
logging.info(f"{Fore.MAGENTA}Execution time statistics:{Style.RESET_ALL}")
for func_name, exec_time in execution_times.items():
logging.info(f"{Fore.CYAN}{func_name}: {exec_time:.2f} seconds{Style.RESET_ALL}")
except Exception as e:
log_error(f"Error saving results: {str(e)}")
def get_available_regions(service_name):
""" Get available regions for a given service with STS check. """
try:
# Get all regions for the service
session = boto3.session.Session()
regions = session.get_available_regions(service_name)
# For IAM and S3 return only us-east-1 as they are global services
# if service_name in ['iam', 's3']:
# return ['us-east-1']
# For EKS use only regions where it's available
if service_name == 'eks':
eks_regions = [
'us-east-1', 'us-east-2', 'us-west-2',
'eu-west-1', 'eu-central-1', 'eu-north-1',
'ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2'
]
return [region for region in regions if region in eks_regions]
# Get list of available regions through EC2
ec2 = boto3.client('ec2', region_name='us-east-1')
try:
# Get list of regions where EC2 is available
ec2_regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']]
# Filter regions for current service
available_regions = [region for region in regions if region in ec2_regions]
# Check availability of each region
working_regions = []
for region in available_regions:
try:
# Try to create a client for the region
test_client = boto3.client(service_name, region_name=region)
# Execute a simple request to check availability
if service_name == 'ec2':
test_client.describe_regions()
elif service_name == 'rds':
test_client.describe_db_instances(MaxRecords=20) # Change to minimum allowed value
elif service_name == 'lambda':
test_client.list_functions(MaxItems=1)
elif service_name == 'elasticache':
test_client.describe_cache_clusters(MaxRecords=20) # Change to minimum allowed value
elif service_name == 'dynamodb':
test_client.list_tables(Limit=1)
elif service_name == 'kms':
test_client.list_keys(Limit=1)
elif service_name == 'elbv2':
test_client.describe_load_balancers(PageSize=1)
elif service_name == 'acm':
test_client.list_certificates(MaxItems=1)
working_regions.append(region)
logging.info(f"{Fore.GREEN}Region {region} is available for {service_name}{Style.RESET_ALL}")
except ClientError as e:
if e.response['Error']['Code'] == 'AuthFailure':
logging.warning(f"{Fore.YELLOW}Region {region} is not available: AuthFailure{Style.RESET_ALL}")
else:
logging.warning(f"{Fore.YELLOW}Region {region} is not available: {str(e)}{Style.RESET_ALL}")
except Exception as e:
logging.warning(f"{Fore.YELLOW}Region {region} is not available: {str(e)}{Style.RESET_ALL}")
return working_regions
except ClientError as e:
logging.error(f"{Fore.RED}Error getting EC2 regions: {str(e)}{Style.RESET_ALL}")
# If failed to get regions through EC2, return all regions
return regions
except Exception as e:
log_error(f"Error getting regions for {service_name}: {str(e)}", "global")
return []
def get_regions(service_name):
""" Get all available regions for a given service. """
return get_available_regions(service_name)
@measure_time
def find_ec2_instances():
"""Find all EC2 instances and their status"""
ec2_regions = get_regions('ec2')
instances_info = []
for region in ec2_regions:
log_region_start(region, "EC2")
try:
ec2 = boto3.client('ec2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
instances = ec2.describe_instances()
if not instances['Reservations']:
logging.info(f"{Fore.YELLOW}No instances found in region: {region}{Style.RESET_ALL}")
else:
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
state = instance['State']['Name']
instance_info = {
'InstanceId': instance_id,
'Region': region,
'State': state,
'InstanceType': instance.get('InstanceType', 'unknown'),
'LaunchTime': instance.get('LaunchTime', 'unknown'),
'Tags': instance.get('Tags', [])
}
if state == 'running':
# Get CPU metrics
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
if cpu_stats['Datapoints']:
avg_cpu = sum([data_point['Average'] for data_point in cpu_stats['Datapoints']]) / len(cpu_stats['Datapoints'])
instance_info['CPUUtilization'] = round(avg_cpu, 2)
else:
instance_info['CPUUtilization'] = None
instances_info.append(instance_info)
logging.info(f"{Fore.GREEN}EC2 instance: {instance_id} in region {region}, "
f"State: {state}, Type: {instance.get('InstanceType', 'unknown')}, "
f"CPU: {instance_info.get('CPUUtilization', 'N/A')}%{Style.RESET_ALL}")
except ClientError as e:
if e.response['Error']['Code'] == 'AuthFailure':
log_error(f"AuthFailure, skipping...", region)
else:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return instances_info
@measure_time
def find_rds_instances():
"""Find all RDS instances and their status"""
rds_regions = get_regions('rds')
instances_info = []
for region in rds_regions:
log_region_start(region, "RDS")
try:
rds = boto3.client('rds', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
instances = rds.describe_db_instances()
for instance in instances['DBInstances']:
instance_id = instance['DBInstanceIdentifier']
instance_info = {
'DBInstanceIdentifier': instance_id,
'Region': region,
'Status': instance['DBInstanceStatus'],
'Engine': instance.get('Engine', 'unknown'),
'InstanceClass': instance.get('DBInstanceClass', 'unknown'),
'CreationTime': instance.get('InstanceCreateTime', 'unknown')
}
if instance['DBInstanceStatus'] == 'available':
# Get metrics
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': instance_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
connections = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='DatabaseConnections',
Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': instance_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
if cpu_stats['Datapoints']:
avg_cpu = sum([dp['Average'] for dp in cpu_stats['Datapoints']]) / len(cpu_stats['Datapoints'])
instance_info['CPUUtilization'] = round(avg_cpu, 2)
if connections['Datapoints']:
avg_conn = sum([dp['Average'] for dp in connections['Datapoints']]) / len(connections['Datapoints'])
instance_info['AverageConnections'] = round(avg_conn, 2)
instances_info.append(instance_info)
logging.info(f"{Fore.GREEN}RDS instance: {instance_id} in region {region}, "
f"State: {instance['DBInstanceStatus']}, Type: {instance.get('DBInstanceClass', 'unknown')}, "
f"CPU: {instance_info.get('CPUUtilization', 'N/A')}%, "
f"Connections: {instance_info.get('AverageConnections', 'N/A')}{Style.RESET_ALL}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
log_region_end(region)
return instances_info
@measure_time
def find_eks_clusters():
"""Find all EKS clusters and their status"""
eks_regions = get_regions('eks')
clusters_info = []
for region in eks_regions:
log_region_start(region, "EKS")
try:
eks = boto3.client('eks', region_name=region)
clusters = eks.list_clusters()
for cluster_name in clusters['clusters']:
cluster = eks.describe_cluster(name=cluster_name)['cluster']
nodegroups = eks.list_nodegroups(clusterName=cluster_name)
cluster_info = {
'ClusterName': cluster_name,
'Region': region,
'Status': cluster['status'],
'Version': cluster.get('version', 'unknown'),
'CreatedAt': cluster.get('createdAt', 'unknown'),
'NodeGroups': len(nodegroups['nodegroups']) if 'nodegroups' in nodegroups else 0
}
clusters_info.append(cluster_info)
logging.info(f"{Fore.GREEN}EKS cluster: {cluster_name} in region {region}, "
f"Status: {cluster['status']}, Version: {cluster.get('version', 'unknown')}, "
f"Node Groups: {cluster_info['NodeGroups']}{Style.RESET_ALL}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return clusters_info
@measure_time
def find_unused_lambda_functions():
""" Find underutilized or idle Lambda functions. """
lambda_regions = get_regions('lambda')
idle_lambda = []
for region in lambda_regions:
log_region_start(region, "Lambda")
try:
lambda_client = boto3.client('lambda', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
functions = lambda_client.list_functions()
for function in functions['Functions']:
invocations = cloudwatch.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName='Invocations',
Dimensions=[
{
'Name': 'FunctionName',
'Value': function['FunctionName']
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
total_invocations = sum([data_point['Sum'] for data_point in invocations['Datapoints']])
if total_invocations == 0:
idle_lambda.append({'FunctionName': function['FunctionName'], 'Region': region})
log_unused_resource("Lambda function", function['FunctionName'], region)
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return idle_lambda
@measure_time
def find_unused_elasticache_clusters():
""" Find underutilized or idle ElastiCache clusters. """
idle_elasticache = []
elasticache_regions = get_regions('elasticache')
for region in elasticache_regions:
log_region_start(region, "ElastiCache")
try:
elasticache = boto3.client('elasticache', region_name=region)
# Get all cache clusters with max records set to 100
try:
response = elasticache.describe_cache_clusters(MaxRecords=100)
clusters = response.get('CacheClusters', [])
while 'Marker' in response:
response = elasticache.describe_cache_clusters(
MaxRecords=100,
Marker=response['Marker']
)
clusters.extend(response.get('CacheClusters', []))
for cluster in clusters:
cluster_id = cluster['CacheClusterId']
# Get CloudWatch metrics for the cluster
cloudwatch = boto3.client('cloudwatch', region_name=region)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=30)
# Check CPU utilization
cpu_response = cloudwatch.get_metric_statistics(
Namespace='AWS/ElastiCache',
MetricName='CPUUtilization',
Dimensions=[
{
'Name': 'CacheClusterId',
'Value': cluster_id
}
],
StartTime=start_time,
EndTime=end_time,
Period=86400, # 1 day
Statistics=['Average']
)
# If no CPU data or average CPU < 5%, consider it idle
if not cpu_response['Datapoints'] or \
all(dp['Average'] < 5.0 for dp in cpu_response['Datapoints']):
idle_elasticache.append({
'type': 'ElastiCache',
'id': cluster_id,
'region': region,
'status': cluster.get('CacheClusterStatus', 'unknown'),
'node_type': cluster.get('CacheNodeType', 'unknown'),
'engine': cluster.get('Engine', 'unknown'),
'creation_time': cluster.get('CacheClusterCreateTime', 'unknown')
})
log_unused_resource('ElastiCache', cluster_id, region,
f"Status: {cluster.get('CacheClusterStatus', 'unknown')}, "
f"Node Type: {cluster.get('CacheNodeType', 'unknown')}")
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidClientTokenId':
logging.warning(f"{Fore.YELLOW}Region {region} is not available: InvalidClientTokenId{Style.RESET_ALL}")
continue
elif e.response['Error']['Code'] == 'AuthFailure':
logging.warning(f"{Fore.YELLOW}Region {region} is not available: AuthFailure{Style.RESET_ALL}")
continue
else:
raise
except ClientError as e:
if e.response['Error']['Code'] in ['InvalidClientTokenId', 'AuthFailure']:
logging.warning(f"{Fore.YELLOW}Region {region} is not available: {e.response['Error']['Code']}{Style.RESET_ALL}")
else:
log_error(f"Error checking ElastiCache in {region}: {str(e)}", region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
log_region_end(region)
return idle_elasticache
@measure_time
def find_unused_ec2_snapshots():
""" Find unused EC2 snapshots older than 30 days. """
ec2_regions = get_regions('ec2')
unused_snapshots = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in ec2_regions:
log_region_start(region, "EC2 Snapshots")
try:
ec2 = boto3.client('ec2', region_name=region)
snapshots = ec2.describe_snapshots(OwnerIds=['self'])
for snapshot in snapshots['Snapshots']:
if snapshot['StartTime'] < cutoff_date:
unused_snapshots.append(
{'SnapshotId': snapshot['SnapshotId'], 'Region': region, 'StartTime': snapshot['StartTime']})
log_unused_resource("EC2 snapshot", snapshot['SnapshotId'], region,
f"Created: {snapshot['StartTime'].strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_snapshots
@measure_time
def find_unused_rds_snapshots():
""" Find unused RDS snapshots older than 30 days. """
rds_regions = get_regions('rds')
unused_snapshots = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in rds_regions:
log_region_start(region, "RDS Snapshots")
try:
rds = boto3.client('rds', region_name=region)
snapshots = rds.describe_db_snapshots(SnapshotType='manual')
for snapshot in snapshots['DBSnapshots']:
if snapshot['SnapshotCreateTime'] < cutoff_date:
unused_snapshots.append({'DBSnapshotIdentifier': snapshot['DBSnapshotIdentifier'], 'Region': region,
'SnapshotCreateTime': snapshot['SnapshotCreateTime']})
log_unused_resource("RDS snapshot", snapshot['DBSnapshotIdentifier'], region,
f"Created: {snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_snapshots
@measure_time
def find_unused_elasticache_snapshots():
""" Find unused ElastiCache snapshots older than 30 days. """
elasticache_regions = get_regions('elasticache')
unused_snapshots = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in elasticache_regions:
elasticache = boto3.client('elasticache', region_name=region)
snapshots = elasticache.describe_snapshots()
for snapshot in snapshots['Snapshots']:
if 'SnapshotCreateTime' in snapshot and snapshot['SnapshotCreateTime'] < cutoff_date:
unused_snapshots.append({'SnapshotName': snapshot['SnapshotName'], 'Region': region,
'SnapshotCreateTime': snapshot['SnapshotCreateTime']})
return unused_snapshots
@measure_time
def find_unused_kms_keys():
""" Find unused KMS keys. """
kms_regions = get_regions('kms')
unused_keys = []
for region in kms_regions:
log_region_start(region, "KMS")
try:
kms = boto3.client('kms', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
keys = kms.list_keys()
for key in keys['Keys']:
key_id = key['KeyId']
key_info = kms.describe_key(KeyId=key_id)
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/KMS',
MetricName='KeyUsage',
Dimensions=[{'Name': 'KeyId', 'Value': key_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=30),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_keys.append({
'KeyId': key_id,
'Region': region,
'Description': key_info['KeyMetadata'].get('Description', 'No description')
})
log_unused_resource("KMS key", key_id, region, key_info['KeyMetadata'].get('Description', 'No description'))
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_keys
@measure_time
def find_unused_s3_buckets():
""" Find unused S3 buckets based on activity metrics and last access time. """
unused_buckets = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=90)
log_region_start("global", "S3")
try:
s3 = boto3.client('s3')
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1') # S3 metrics are in us-east-1
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
try:
# Check activity metrics for the last 90 days
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='NumberOfObjects',
Dimensions=[
{
'Name': 'BucketName',
'Value': bucket_name
},
{
'Name': 'StorageType',
'Value': 'AllStorageTypes'
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=90),
EndTime=datetime.now(timezone.utc),
Period=86400, # 1 day
Statistics=['Sum']
)
# Check request metrics
request_metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='AllRequests',
Dimensions=[
{
'Name': 'BucketName',
'Value': bucket_name
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=90),
EndTime=datetime.now(timezone.utc),
Period=86400,
Statistics=['Sum']
)
# Check if bucket is empty
objects = s3.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
is_empty = not objects.get('Contents')
# Check if bucket is not used
no_activity = (
(not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0) and
(not request_metrics['Datapoints'] or sum([dp['Sum'] for dp in request_metrics['Datapoints']]) == 0)
)
if is_empty and no_activity:
unused_buckets.append({
'BucketName': bucket_name,
'CreationDate': bucket['CreationDate'].strftime('%Y-%m-%d'),
'LastActivity': 'No activity in last 90 days'
})
log_unused_resource("S3 bucket", bucket_name, "global",
f"Created: {bucket['CreationDate'].strftime('%Y-%m-%d')}, "
f"No activity in last 90 days")
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
continue
log_error(f"Error checking bucket {bucket_name}: {str(e)}")
except Exception as e:
log_error(f"Unexpected error checking bucket {bucket_name}: {str(e)}")
except ClientError as e:
log_error(str(e))
except Exception as e:
log_error(f"Unknown error: {str(e)}")
return unused_buckets
@measure_time
def find_unused_dynamodb_tables():
""" Find unused DynamoDB tables. """
dynamodb_regions = get_regions('dynamodb')
unused_tables = []
for region in dynamodb_regions:
log_region_start(region, "DynamoDB")
try:
dynamodb = boto3.client('dynamodb', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
tables = dynamodb.list_tables()
for table_name in tables['TableNames']:
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/DynamoDB',
MetricName='ConsumedReadCapacityUnits',
Dimensions=[{'Name': 'TableName', 'Value': table_name}],
StartTime=datetime.now(timezone.utc) - timedelta(days=30),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_tables.append({
'TableName': table_name,
'Region': region
})
log_unused_resource("DynamoDB table", table_name, region)
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_tables
@measure_time
def find_unused_iam_roles():
"""Find IAM roles not used in the last 90 days"""
unused_roles = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=90)
try:
iam = boto3.client('iam')
roles = iam.list_roles()
for role in roles['Roles']:
try:
role_name = role['RoleName']
role_info = iam.get_role(RoleName=role_name)
# Check if role has been used
last_used = role_info['Role'].get('RoleLastUsed', {}).get('LastUsedDate')
if not last_used or last_used < cutoff_date:
unused_roles.append({
'RoleName': role_name,
'LastUsed': last_used.strftime('%Y-%m-%d') if last_used else 'Never'
})
log_unused_resource("IAM role", role_name, "global",
f"Last used: {last_used.strftime('%Y-%m-%d') if last_used else 'Never'}")
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntity':
continue
log_error(f"Error checking role {role_name}: {str(e)}")
except Exception as e:
log_error(f"Unexpected error checking role {role_name}: {str(e)}")
except ClientError as e:
log_error(f"Error listing IAM roles: {str(e)}")
except Exception as e:
log_error(f"Unknown error in find_unused_iam_roles: {str(e)}")
return unused_roles
@measure_time
def find_unused_iam_policies():
""" Find unused IAM policies. """
iam = boto3.client('iam')
unused_policies = []
log_region_start("global", "IAM Policies")
try:
policies = iam.list_policies(Scope='Local')
for policy in policies['Policies']:
policy_arn = policy['Arn']
try:
entities = iam.list_entities_for_policy(PolicyArn=policy_arn)
if not (entities['PolicyGroups'] or entities['PolicyUsers'] or entities['PolicyRoles']):
unused_policies.append({
'PolicyName': policy['PolicyName'],
'PolicyId': policy['PolicyId']
})
log_unused_resource("IAM policy", policy['PolicyName'], "global", f"ID: {policy['PolicyId']}")
except ClientError as e:
log_error(f"Error checking policy {policy['PolicyName']}: {e}")
except ClientError as e:
log_error(str(e))
except Exception as e:
log_error(f"Unknown error: {str(e)}")
return unused_policies
@measure_time
def find_unused_ebs_volumes():
""" Find unused EBS volumes. """
ec2_regions = get_regions('ec2')
unused_volumes = []
for region in ec2_regions:
log_region_start(region, "EBS")
try:
ec2 = boto3.client('ec2', region_name=region)
volumes = ec2.describe_volumes(
Filters=[
{
'Name': 'status',
'Values': ['available']
}
]
)
for volume in volumes['Volumes']:
if volume['State'] == 'available':
unused_volumes.append({
'VolumeId': volume['VolumeId'],
'Region': region,
'Size': volume['Size'],
'CreateTime': volume['CreateTime']
})
log_unused_resource("EBS volume", volume['VolumeId'], region,
f"Size: {volume['Size']}GB, Created: {volume['CreateTime'].strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_volumes
@measure_time
def find_unused_amis():
""" Find unused AMIs older than 30 days. """
ec2_regions = get_regions('ec2')
unused_amis = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in ec2_regions:
log_region_start(region, "AMI")
try:
ec2 = boto3.client('ec2', region_name=region)
amis = ec2.describe_images(Owners=['self'])
for ami in amis['Images']:
creation_date = datetime.strptime(ami['CreationDate'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
if creation_date < cutoff_date:
unused_amis.append({
'ImageId': ami['ImageId'],
'Region': region,
'Name': ami.get('Name', 'No name'),
'CreationDate': creation_date
})
log_unused_resource("AMI", ami['ImageId'], region,
f"Name: {ami.get('Name', 'No name')}, Created: {creation_date.strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_amis
@measure_time
def find_unused_alb():
""" Find unused Application Load Balancers. """
elbv2_regions = get_regions('elbv2')
unused_alb = []
for region in elbv2_regions:
log_region_start(region, "ALB")
try:
elbv2 = boto3.client('elbv2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
load_balancers = elbv2.describe_load_balancers()
for lb in load_balancers['LoadBalancers']:
if lb['Type'] == 'application':
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='RequestCount',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': lb['LoadBalancerArn'].split('/')[-1]
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_alb.append({
'LoadBalancerName': lb['LoadBalancerName'],
'Region': region,
'DNSName': lb['DNSName']
})
log_unused_resource("ALB", lb['LoadBalancerName'], region, f"DNS: {lb['DNSName']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_alb
@measure_time
def find_unused_target_groups():
""" Find unused Target Groups. """
elbv2_regions = get_regions('elbv2')
unused_target_groups = []
for region in elbv2_regions:
log_region_start(region, "Target Groups")
try:
elbv2 = boto3.client('elbv2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
target_groups = elbv2.describe_target_groups()
for tg in target_groups['TargetGroups']:
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='RequestCount',
Dimensions=[
{
'Name': 'TargetGroup',
'Value': tg['TargetGroupArn'].split('/')[-1]
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_target_groups.append({
'TargetGroupName': tg['TargetGroupName'],
'Region': region,
'Protocol': tg['Protocol'],
'Port': tg['Port']
})
log_unused_resource("Target Group", tg['TargetGroupName'], region,
f"Protocol: {tg['Protocol']}, Port: {tg['Port']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_target_groups
@measure_time
def find_unused_acm_certificates():
""" Find unused ACM certificates. """
acm_regions = get_regions('acm')
unused_certificates = []
for region in acm_regions:
log_region_start(region, "ACM")
try:
acm = boto3.client('acm', region_name=region)
certificates = acm.list_certificates()
for cert in certificates['CertificateSummaryList']:
cert_arn = cert['CertificateArn']
cert_detail = acm.describe_certificate(CertificateArn=cert_arn)
# Check if certificate is used
in_use = False
if 'InUseBy' in cert_detail['Certificate'] and cert_detail['Certificate']['InUseBy']:
in_use = True
if not in_use:
unused_certificates.append({
'CertificateArn': cert_arn,
'Region': region,
'DomainName': cert['DomainName'],
'Status': cert_detail['Certificate']['Status']
})
log_unused_resource("ACM certificate", cert['DomainName'], region,
f"Status: {cert_detail['Certificate']['Status']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_certificates
@measure_time
def find_unused_vpc_resources():
""" Find unused VPC resources. """
ec2_regions = get_regions('ec2')
unused_resources = {
'vpc': [],
'subnets': [],
'security_groups': [],
'network_interfaces': []
}
for region in ec2_regions:
log_region_start(region, "VPC")
try:
ec2 = boto3.client('ec2', region_name=region)
# Get list of default VPCs
default_vpcs = []
vpcs = ec2.describe_vpcs()
for vpc in vpcs['Vpcs']:
if vpc['CidrBlock'] == '172.31.0.0/16' or vpc['IsDefault']:
default_vpcs.append(vpc['VpcId'])
# Check VPC
for vpc in vpcs['Vpcs']:
vpc_id = vpc['VpcId']
# Skip default VPCs
if vpc_id in default_vpcs:
continue
# Check if VPC is used
instances = ec2.describe_instances(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])
if not instances['Reservations']:
unused_resources['vpc'].append({
'VpcId': vpc_id,
'Region': region,
'CidrBlock': vpc['CidrBlock']
})
log_unused_resource("VPC", vpc_id, region, f"CIDR: {vpc['CidrBlock']}")
# Check subnet
subnets = ec2.describe_subnets()
for subnet in subnets['Subnets']:
subnet_id = subnet['SubnetId']
# Skip subnets in default VPCs
if subnet['VpcId'] in default_vpcs:
continue
# Check if subnet is used
instances = ec2.describe_instances(Filters=[{'Name': 'subnet-id', 'Values': [subnet_id]}])
if not instances['Reservations']:
unused_resources['subnets'].append({
'SubnetId': subnet_id,
'Region': region,
'CidrBlock': subnet['CidrBlock']
})
log_unused_resource("Subnet", subnet_id, region, f"CIDR: {subnet['CidrBlock']}")
# Check security groups
security_groups = ec2.describe_security_groups()
for sg in security_groups['SecurityGroups']:
sg_id = sg['GroupId']
# Skip default security groups
if sg['GroupName'] == 'default' or sg['VpcId'] in default_vpcs:
continue
# Check if security group is used
instances = ec2.describe_instances(Filters=[{'Name': 'instance.group-id', 'Values': [sg_id]}])
if not instances['Reservations']:
unused_resources['security_groups'].append({
'GroupId': sg_id,
'Region': region,
'GroupName': sg['GroupName']
})
log_unused_resource("Security Group", sg_id, region, f"Name: {sg['GroupName']}")
# Check network interfaces
network_interfaces = ec2.describe_network_interfaces()
for ni in network_interfaces['NetworkInterfaces']:
# Skip network interfaces in default VPCs
if ni['VpcId'] in default_vpcs:
continue
if ni['Status'] == 'available':
unused_resources['network_interfaces'].append({
'NetworkInterfaceId': ni['NetworkInterfaceId'],
'Region': region,
'SubnetId': ni['SubnetId']
})
log_unused_resource("Network Interface", ni['NetworkInterfaceId'], region,
f"Subnet: {ni['SubnetId']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_resources
@measure_time
def find_unused_ec2_resources():
""" Find unused EC2 resources. """
ec2_regions = get_regions('ec2')
unused_resources = {
'elastic_ips': [],
'placement_groups': [],
'dedicated_hosts': []
}
for region in ec2_regions:
log_region_start(region, "EC2 Resources")
try:
ec2 = boto3.client('ec2', region_name=region)
# Check Elastic IPs
addresses = ec2.describe_addresses()
for address in addresses['Addresses']:
if 'InstanceId' not in address:
unused_resources['elastic_ips'].append({
'AllocationId': address['AllocationId'],
'Region': region,
'PublicIp': address['PublicIp']
})
log_unused_resource("Elastic IP", address['AllocationId'], region,
f"IP: {address['PublicIp']}")
# Check Placement Groups
placement_groups = ec2.describe_placement_groups()
for pg in placement_groups['PlacementGroups']:
instances = ec2.describe_instances(Filters=[{'Name': 'placement-group-name', 'Values': [pg['GroupName']]}])
if not instances['Reservations']:
unused_resources['placement_groups'].append({
'GroupName': pg['GroupName'],
'Region': region,
'Strategy': pg['Strategy']
})
log_unused_resource("Placement Group", pg['GroupName'], region,
f"Strategy: {pg['Strategy']}")
# Check Dedicated Hosts
try:
hosts = ec2.describe_hosts()
for host in hosts['Hosts']:
if host['State'] == 'available' and not host.get('Instances'):
unused_resources['dedicated_hosts'].append({
'HostId': host['HostId'],
'Region': region,
'InstanceFamily': host.get('HostProperties', {}).get('InstanceFamily', 'unknown')
})
log_unused_resource("Dedicated Host", host['HostId'], region,
f"Family: {host.get('HostProperties', {}).get('InstanceFamily', 'unknown')}")
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterValue':
logging.warning(f"{Fore.YELLOW}Dedicated Hosts not supported in region {region}{Style.RESET_ALL}")
else:
raise
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_resources
@measure_time
def find_unused_iam_resources():
""" Find unused IAM resources. """
iam = boto3.client('iam')
unused_resources = {
'users': [],
'groups': [],
'access_keys': []
}
log_region_start("global", "IAM Resources")
try:
# Check IAM Users
users = iam.list_users()
for user in users['Users']:
user_name = user['UserName']
last_used = iam.get_user(UserName=user_name)['User'].get('PasswordLastUsed')
if not last_used or (datetime.now(timezone.utc) - last_used.replace(tzinfo=timezone.utc)).days > 90:
unused_resources['users'].append({
'UserName': user_name,
'LastUsed': last_used.strftime('%Y-%m-%d') if last_used else 'Never'
})
log_unused_resource("IAM User", user_name, "global",
f"Last used: {last_used.strftime('%Y-%m-%d') if last_used else 'Never'}")
# Check IAM Groups
groups = iam.list_groups()
for group in groups['Groups']:
group_name = group['GroupName']
users = iam.get_group(GroupName=group_name)
if not users['Users']:
unused_resources['groups'].append({
'GroupName': group_name,
'CreateDate': group['CreateDate'].strftime('%Y-%m-%d')
})
log_unused_resource("IAM Group", group_name, "global",
f"Created: {group['CreateDate'].strftime('%Y-%m-%d')}")
# Check Access Keys
for user in users['Users']:
access_keys = iam.list_access_keys(UserName=user['UserName'])
for key in access_keys['AccessKeyMetadata']:
last_used = iam.get_access_key_last_used(AccessKeyId=key['AccessKeyId'])
if not last_used['AccessKeyLastUsed'].get('LastUsedDate'):
unused_resources['access_keys'].append({
'AccessKeyId': key['AccessKeyId'],
'UserName': user['UserName'],
'CreateDate': key['CreateDate'].strftime('%Y-%m-%d')
})
log_unused_resource("Access Key", key['AccessKeyId'], "global",
f"User: {user['UserName']}")
except ClientError as e:
log_error(str(e))
except Exception as e:
log_error(f"Unknown error: {str(e)}")
return unused_resources
@measure_time
def find_unused_ecr_repositories():
""" Find unused ECR repositories. """
ecr_regions = get_regions('ecr')
unused_repositories = []
for region in ecr_regions:
log_region_start(region, "ECR")
try:
ecr = boto3.client('ecr', region_name=region)
repositories = ecr.describe_repositories()
for repo in repositories['repositories']:
repo_name = repo['repositoryName']
# Get image details
try:
images = ecr.describe_images(repositoryName=repo_name)
# Check if repository has any images
if not images['imageDetails']:
unused_repositories.append({
'RepositoryName': repo_name,
'Region': region,
'CreatedAt': repo['createdAt'].strftime('%Y-%m-%d')
})
log_unused_resource("ECR repository", repo_name, region,
f"Created: {repo['createdAt'].strftime('%Y-%m-%d')}")
else:
# Check last image push time
last_push = max(img['imagePushedAt'] for img in images['imageDetails'])
if (datetime.now(timezone.utc) - last_push.replace(tzinfo=timezone.utc)).days > 90:
unused_repositories.append({
'RepositoryName': repo_name,
'Region': region,
'LastPush': last_push.strftime('%Y-%m-%d'),
'ImageCount': len(images['imageDetails'])
})
log_unused_resource("ECR repository", repo_name, region,
f"Last push: {last_push.strftime('%Y-%m-%d')}, "
f"Images: {len(images['imageDetails'])}")
except ecr.exceptions.RepositoryNotFoundException:
continue
except Exception as e:
log_error(f"Error checking repository {repo_name}: {str(e)}", region)
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
log_region_end(region)
return unused_repositories
if __name__ == "__main__":
RUN_COUNTER += 1
log_run_start()
logging.info(f"{Fore.MAGENTA}Starting AWS resources check{Style.RESET_ALL}")
start_time = time.time()
try:
all_results = {
'ec2_instances': find_ec2_instances(),
'rds_instances': find_rds_instances(),
'eks_clusters': find_eks_clusters(),
'lambda_functions': find_unused_lambda_functions(),
'elasticache_clusters': find_unused_elasticache_clusters(),
'ec2_snapshots': find_unused_ec2_snapshots(),
'rds_snapshots': find_unused_rds_snapshots(),
'elasticache_snapshots': find_unused_elasticache_snapshots(),
'kms_keys': find_unused_kms_keys(),
's3_buckets': find_unused_s3_buckets(),
'dynamodb_tables': find_unused_dynamodb_tables(),
'iam_roles': find_unused_iam_roles(),
'iam_policies': find_unused_iam_policies(),
'ebs_volumes': find_unused_ebs_volumes(),
'amis': find_unused_amis(),
'alb': find_unused_alb(),
'target_groups': find_unused_target_groups(),
'acm_certificates': find_unused_acm_certificates(),
'vpc_resources': find_unused_vpc_resources(),
'ec2_resources': find_unused_ec2_resources(),
'iam_resources': find_unused_iam_resources(),
'ecr_repositories': find_unused_ecr_repositories()
}
save_results_to_file(all_results)
total_resources = sum(len(v) if isinstance(v, list) else sum(len(x) for x in v.values() if isinstance(x, list)) for v in all_results.values())
total_execution_time = time.time() - start_time
hours, remainder = divmod(total_execution_time, 3600)
minutes, seconds = divmod(remainder, 60)
logging.info(f"{Fore.MAGENTA}Found total of {total_resources} resources{Style.RESET_ALL}")
logging.info(f"{Fore.MAGENTA}Total execution time: {int(hours)}h {int(minutes)}m {seconds:.2f}s{Style.RESET_ALL}")
logging.info(f"{Fore.MAGENTA}Execution time statistics:{Style.RESET_ALL}")
for func_name, exec_time in execution_times.items():
logging.info(f"{Fore.CYAN}{func_name}: {exec_time:.2f} seconds{Style.RESET_ALL}")
except NoCredentialsError:
log_error("AWS credentials not found. Please configure your AWS credentials.")
except ClientError as e:
log_error(f"AWS API error: {e}")
except Exception as e:
log_error(f"Unknown error: {e}")
finally:
log_run_end()