Automating AWS Cost Optimization: A Deep Dive into Unused Resource Detection
Written by:
Igor Gorovyy
DevOps Engineer Lead & Senior Solutions Architect
Objective
It just so happened that I was writing an article about the operational day-to-day activities of a FinOps engineer — focusing on which things are important to monitor daily, which tools assist in doing so, and how to identify "unused" or "abandoned" resources. And now, while closing down several projects, I had the chance to once again validate the effectiveness of the approach and tool I had chosen.
I'm sharing my personal approach, specifically in the context of AWS Cloud, though I believe many face similar situations in other clouds. Every cloud provider — including AWS — offers many native tools, and while they're certainly useful, they each come with limitations and drawbacks. Sooner or later, an engineer finds themselves needing a custom tool tailored to their specific context.
AWS makes it easy to provision new resources, but due to human factors and immature processes, these resources often go unnoticed once a project ends — leading to unnecessary costs.
The tool described in the article helps detect resources that are either unused or underutilized. This allows teams to reduce cloud waste and improve overall infrastructure governance and cost-efficiency.
Cloud-native applications tend to grow rapidly — and so do their costs. One major contributor to unnecessary AWS spend is abandoned or underutilized resources: EC2 instances left running, unattached volumes, idle Lambda functions, and more.
This article presents an approach to building a custom solution for identifying and inventorying unused (abandoned) resources in your AWS account to achieve cost optimization, and automate FinOps practices.
🧰 The article provides both a conceptual approach and partial implementation of the scanner, written in Python using
boto3
,CloudWatch
,STS
,colorama
, and structuredJSON
logging.
⚠️ Clarifications or Partial Accuracy
Statement | Comment |
---|---|
"An S3 bucket can be considered unused if there are no activity metrics and it is empty." | ⚠️ This is a general heuristic, but not a 100% guarantee — CloudWatch Metrics might be limited unless S3 Storage Class Analysis or advanced logging is enabled. |
"A KMS Key is unused if CloudWatch shows zero usage." | ⚠️ Use caution: not all key usage is logged as metrics. For example, automatic encryption in S3 or EBS may not appear in CloudWatch metrics. |
"Default VPC can be safely deleted." | ⚠️ Only true if you're absolutely sure no resources are using it — some AWS services use the default VPC automatically. |
"Dedicated Hosts are considered unused if there are no running instances." | ⚠️ Correct, but sometimes they are reserved ahead of time for specific workloads — context is important. |
This article focuses on the core mechanism for detecting abandoned or idle cloud resources. The broader infrastructure is only partially presented.
If you're interested in the full implementation, feel free to leave a comment — I'll provide a more complete overview and share the full source code.
Why a Custom Scanner?
While AWS provides several built-in tools like Trusted Advisor, Compute Optimizer, and Cost Explorer, they often:
- Require a premium support plan
- Are limited in scope (e.g., only EC2, EBS, or CPU-related metrics)
- Do not offer automation, JSON reports, or Slack/email alerts
This custom scanner fills those gaps.
Key Features
✅ Multi-region scanning per service
✅ Service support: EC2, RDS, Lambda, S3, IAM, ACM, DynamoDB, VPC, EBS, ELB, ECR, and more
✅ CloudWatch metrics evaluation (e.g., CPU, connections, invocations)
✅ JSON output report with timestamp and account ID
✅ Execution time tracking per function
✅ Slack + SES email notification support
✅ S3 export for FinOps dashboards
✅ EventBridge-compatible for scheduling
✅ Language support: English 🇺🇸 + Ukrainian 🇺🇦
How It Works
Diagram
flowchart TD
%% Main Flow
A[⏰ EventBridge Schedule<br>Weekly/Monthly] --> B[🚀 AWS Lambda / ECS Task]
B --> C[🧠 FinOps Scanner Script]
%% Resource Scanning
subgraph "Resource Scanning Phase"
C --> D1[🔍 Scan AWS Services]
C --> D2[📊 CloudWatch Metrics]
C --> D3[🧾 Resource Analysis]
D1 --> D3
D2 --> D3
end
%% Analysis & Reporting
subgraph "Analysis & Reporting Phase"
D3 --> E1[💾 JSON Report to S3]
D3 --> E2[🔔 Slack Alerts]
D3 --> E3[📧 Email via SES]
end
%% Data Pipeline
subgraph "Data Pipeline"
E1 --> F1[S3 Bucket]
F1 --> F2[Glue Crawler]
F2 --> F3[Athena]
F3 --> F4[Visualization]
subgraph "Visualization Options"
F4 --> G1[QuickSight]
F4 --> G2[Grafana]
end
end
%% Styling
classDef schedule fill:#f8f9fa,stroke:#6c757d,stroke-width:2px
classDef compute fill:#e9ecef,stroke:#495057,stroke-width:2px
classDef scanner fill:#dee2e6,stroke:#343a40,stroke-width:2px
classDef scan fill:#ced4da,stroke:#212529,stroke-width:2px
classDef report fill:#adb5bd,stroke:#1a1e21,stroke-width:2px
classDef data fill:#6c757d,stroke:#f8f9fa,stroke-width:2px
classDef viz fill:#495057,stroke:#e9ecef,stroke-width:2px
class A schedule
class B compute
class C scanner
class D1,D2,D3 scan
class E1,E2,E3 report
class F1,F2,F3,F4 data
class G1,G2 viz
1. Startup
- Detect AWS account ID
- Configure logging with timestamped files
- Parse language from environment variable (default: UA)
2. Execution Timing
Each function is wrapped in a decorator to measure and log runtime.
3. Per-service Resource Analysis
Each major AWS service has a dedicated find_*()
function:
Service | What It Checks |
---|---|
EC2 | Running instances with CPU usage < threshold |
RDS | Low CPU or DB connections |
Lambda | No invocations in last 7 days |
ElastiCache | Clusters with CPU < 5% |
EBS Volumes | Unattached (status: available) |
Snapshots | Older than 30 days (EC2, RDS, ElastiCache) |
S3 | Empty buckets with no access in 90 days |
IAM | Roles, users, policies, and keys not used |
ACM/KMS | Unused certificates and keys |
DynamoDB | No read capacity used |
ECR | No image push in last 90 days |
ALBs / TGs | No request traffic via CloudWatch |
VPC | Unused subnets, SGs, network interfaces |
Output
At the end of a scan, the script:
- Saves the full results to
results/aws_waste_results_<account_id>_<timestamp>.json
- Includes execution time per function
- Logs a summary (number of items found, time taken)
{
"ec2_instances": [],
"rds_instances": [],
"eks_clusters": [],
"lambda_functions": [
{
"FunctionName": "Demo-LambdaFunction-ASHKskIYvVLJ",
"Region": "eu-central-1"
}
],
"elasticache_clusters": [],
"ec2_snapshots": [],
"rds_snapshots": [],
"elasticache_snapshots": [],
"kms_keys": [
{
"KeyId": "mrk-8d3588c3caf64d86b82c30994703057a",
"Region": "eu-central-1",
"Description": "CMK Infra S3 tfstate"
}
],
"s3_buckets": [],
"dynamodb_tables": [],
"iam_roles": [
{
"RoleName": "AWSServiceRoleForCloudTrail",
"LastUsed": "2024-11-26"
},
{
"RoleName": "AWSServiceRoleForComputeOptimizer",
"LastUsed": "Never"
},
{
"RoleName": "AWSServiceRoleForKeyManagementServiceMultiRegionKeys",
"LastUsed": "2024-12-11"
},
{
"RoleName": "AWSServiceRoleForSupport",
"LastUsed": "Never"
},
{
"RoleName": "AWSServiceRoleForTrustedAdvisor",
"LastUsed": "Never"
},
{
"RoleName": " Demo-CrossAccountRole-rwI5LmDqcHJR",
"LastUsed": "2024-12-04"
},
{
"RoleName": " Demo-LambdaExecutionRole-v4VLtulWfCW2",
"LastUsed": "2024-11-25"
}
],
"iam_policies": [],
"ebs_volumes": [],
"amis": [],
"alb": [],
"target_groups": [],
"acm_certificates": [],
"vpc_resources": {
"vpc": [],
"subnets": [],
"security_groups": [],
"network_interfaces": []
},
"ec2_resources": {
"elastic_ips": [],
"placement_groups": [],
"dedicated_hosts": []
},
"iam_resources": {
"users": [
{
"UserName": "provisioner",
"LastUsed": "Never"
}
],
"groups": [],
"access_keys": []
},
"ecr_repositories": [],
"execution_times": {
"find_idle_ec2_instances": 26.1112859249115,
"find_unused_rds_instances": 14.077875852584839,
"find_idle_eks_clusters": 5.355216979980469,
"find_unused_lambda_functions": 21.097691774368286,
"find_unused_elasticache_clusters": 13.915351152420044,
"find_unused_ec2_snapshots": 27.415552139282227,
"find_unused_rds_snapshots": 14.19884705543518,
"find_unused_elasticache_snapshots": 12.656662940979004,
"find_unused_kms_keys": 31.967848300933838,
"find_unused_s3_buckets": 0.30380916595458984,
"find_unused_dynamodb_tables": 44.07150626182556,
"find_unused_iam_roles": 2.6107289791107178,
"find_unused_iam_policies": 0.53951096534729,
"find_unused_ebs_volumes": 27.511493682861328,
"find_unused_amis": 25.320037841796875,
"find_unused_alb": 25.871586084365845,
"find_unused_target_groups": 24.60790991783142,
"find_unused_acm_certificates": 20.009007215499878,
"find_unused_vpc_resources": 41.87678790092468,
"find_unused_ec2_resources": 32.71801400184631,
"find_unused_iam_resources": 1.2918949127197266,
"find_unused_ecr_repositories": 13.991612195968628
}
}
FinOps Dashboard Integration
You can easily feed this data into a dashboard:
- Upload to S3
- Use Glue Crawler to define schema
- Query with Athena
- Build reports in QuickSight or Grafana
Examples:
- 📊 Top 10 idle EC2 by cost
- 🗺️ Heatmap of resource sprawl per region
- 📉 Usage trend over time
IAM Permissions Required
The scanner requires read-only + CloudWatch + S3 + SES permissions. A sample IAM policy includes:
ec2:Describe*
,rds:Describe*
,lambda:ListFunctions
cloudwatch:GetMetricStatistics
s3:PutObject
,ses:SendEmail
logs:*
(for Lambda logging)
This is the full IAM policy :
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadResourceMetadata",
"Effect": "Allow",
"Action": [
"ec2:Describe*",
"rds:Describe*",
"eks:Describe*",
"eks:List*",
"lambda:ListFunctions",
"lambda:GetFunction",
"elasticache:Describe*",
"dynamodb:ListTables",
"dynamodb:DescribeTable",
"acm:ListCertificates",
"acm:DescribeCertificate",
"kms:ListKeys",
"kms:DescribeKey",
"s3:ListAllMyBuckets",
"s3:ListBucket",
"s3:GetBucketLocation",
"s3:GetBucketTagging",
"s3:GetBucketAcl",
"ecr:DescribeRepositories",
"ecr:DescribeImages",
"iam:ListRoles",
"iam:GetRole",
"iam:ListPolicies",
"iam:ListEntitiesForPolicy",
"iam:ListUsers",
"iam:GetUser",
"iam:ListAccessKeys",
"iam:GetAccessKeyLastUsed",
"iam:ListGroups",
"iam:GetGroup"
],
"Resource": "*"
},
{
"Sid": "CloudWatchMetricsAccess",
"Effect": "Allow",
"Action": [
"cloudwatch:GetMetricStatistics",
"cloudwatch:ListMetrics"
],
"Resource": "*"
},
{
"Sid": "AllowSESSendEmail",
"Effect": "Allow",
"Action": [
"ses:SendEmail",
"ses:SendRawEmail"
],
"Resource": "*"
},
{
"Sid": "AllowS3UploadResults",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::your-bucket-name/*"
},
{
"Sid": "AllowLogAccess",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
},
{
"Sid": "AllowSTSID",
"Effect": "Allow",
"Action": [
"sts:GetCallerIdentity"
],
"Resource": "*"
}
]
}
Feature Comparison
Feature / Tool | Trusted Advisor | Compute Optimizer | Cost Explorer | Resource Explorer | This Script |
---|---|---|---|---|---|
EC2, RDS, Lambda scan | ✅ (partial) | ✅ (CPU only) | ⚠️ (by trend) | ❌ | ✅ |
Unused S3 detection | ❌ | ❌ | ❌ | ❌ | ✅ |
IAM usage visibility | ❌ | ❌ | ❌ | ❌ | ✅ |
ACM/KMS scan | ❌ | ❌ | ❌ | ❌ | ✅ |
Snapshot cleanup | ✅ | ❌ | ❌ | ❌ | ✅ |
Slack/Email notifications | ❌ | ❌ | ❌ | ❌ | ✅ |
JSON reports | ❌ | ❌ | ❌ | ❌ | ✅ |
Automation & scheduling | ❌ | ❌ | ❌ | ❌ | ✅ EventBridge |
Dashboard-ready | ⚠️ | ✅ | ✅ | ⚠️ | ✅ Athena/S3 |
Premium support needed | ✅ Yes | ❌ | ❌ | ❌ | ❌ |
Extendable (custom logic) | ❌ | ❌ | ❌ | ❌ | ✅ |
Deployment Options
- Run as Lambda (use included Dockerfile or zip package)
- Schedule via EventBridge
- Use Terraform for IAM setup
- Customize thresholds (CPU%, days, etc.)
Running as AWS Lambda
- Using Dockerfile: You can create a container with this script and deploy it as a container-based Lambda function. This allows you to include all necessary dependencies and libraries without the size limitations of a regular Lambda package.
- ZIP package: Alternatively, the code can be packaged into a ZIP file with dependencies for standard Lambda deployment. This is faster to deploy but may be more challenging for dependency management.
- Timeout Configuration: Set Lambda timeout to 15 minutes for complete scanning of all services.
- Memory Configuration: It's recommended to configure at least 1024MB of memory for efficient scanner operation.
Scheduling via EventBridge
- Regular Execution: Configure an EventBridge rule for weekly or monthly scanner execution.
- Cron Expression: Use an expression like
cron(0 9 ? * MON *)
to run every Monday at 9 AM. - Parameterization: Pass different parameters for different runs (e.g., different AWS accounts via Lambda parameters).
- Notifications: Configure EventBridge to send results to SNS for notifications after scanning completes.
Using Terraform for IAM Setup
- IAM Module: Create a Terraform module that automatically configures all necessary IAM roles and policies.
- Multi-Account Support: Configure trust relationships to work with multiple AWS accounts.
- Principle of Least Privilege: Adapt the IAM policy shown in the article to grant only necessary permissions depending on the functionality you're using.
- Terraform Variables: Use variables for flexible configuration of S3 buckets, SES email addresses and other parameters.
Customizing Thresholds
- Configuration File: Create a configuration JSON file to store threshold values for different services.
- Environment Variables: Use environment variables to pass threshold values to the Lambda function.
- Example Values:
- CPU usage: 5% for detecting unused EC2 instances
- Time interval: 7 days for Lambda functions without invocations
- Snapshot age: 30 days for detecting old unclaimed snapshots
- S3 activity: 90 days without access for determining unused buckets
Best Practices
- Use resource tagging:
Owner
,Purpose
,TTL
- Review flagged resources before deletion
- Schedule weekly/monthly scans
- Visualize trends using S3+Athena dashboards
- Combine with native tools for full FinOps coverage
Implementing Resource Tagging
- Owner Tags: Always tag resources with the responsible person or team. This facilitates accountability and makes it easier to identify who to contact before taking action on unused resources.
- Format example:
Owner: team-name@company.com
orOwner: devops-team
- Purpose Tags: Add tags describing the resource's function or the project it belongs to, helping determine if it's still needed.
- Format example:
Project: customer-portal
orEnvironment: staging
- TTL (Time-to-Live) Tags: For temporary resources, set an expiration date to automate cleanup.
- Format example:
TTL: 2025-06-30
orExpiry: Q2-2025
- Automated Tag Enforcement: Use AWS Config Rules or Organizations Tag Policies to enforce tagging standards across all accounts.
Reviewing Flagged Resources
- Verification Process: Establish a review workflow that includes verification steps before resource deletion.
- Quarantine Approach: Instead of immediate deletion, consider moving resources to a "quarantine" state (e.g., stopping instances instead of terminating).
- Notification Period: Send notifications to resource owners and allow a grace period (e.g., 14 days) before taking action.
- Change Management: Document all cleanup actions in your change management system for audit purposes.
- Resource Dependencies: Check for hidden dependencies before removing resources (e.g., EBS volumes that appear unused but are part of a snapshot lifecycle).
Scheduling Regular Scans
- Weekly Light Scans: Run lightweight scans weekly to identify obvious waste (e.g., stopped instances, unattached volumes).
- Monthly Deep Scans: Schedule comprehensive monthly scans that analyze CloudWatch metrics for usage patterns.
- Quarterly Audits: Perform quarterly comprehensive reviews including manual verification of large or critical resources.
- Report Distribution: Automatically distribute scan results to team leaders and financial stakeholders.
- Action Tracking: Maintain a tracker for identified waste and remediation actions to measure the program's effectiveness.
Visualizing Cost Trends
- S3 + Athena Architecture: Store scanner results in S3, use Glue crawlers to catalog the data, and query with Athena.
- Dashboard Metrics: Create dashboards showing:
- Resource waste by service type
- Cost savings opportunities by team or project
- Historical trend of resource utilization
- Top waste contributors
- Grafana Integration: Use custom Grafana dashboards with Athena data source for real-time visibility.
- Executive Reporting: Create simplified executive dashboards focusing on cost trends and savings realized.
Complementary Native Tools
- AWS Cost Explorer: Use alongside this scanner for billing-based analysis and reserved instance coverage.
- Trusted Advisor: Supplement scanner findings with Trusted Advisor for security and performance recommendations.
- AWS Compute Optimizer: Leverage for right-sizing recommendations to complement idle resource detection.
- AWS Budgets: Set up budget alerts to complement waste detection with overall spend monitoring.
- Cost Anomaly Detection: Enable AWS Cost Anomaly Detection for unexpected spending increases that might indicate resource sprawl.
Bonus: Add-on Tools
Slack Bot Integration
- Real-time Alerting: Create a Slack bot that posts scanner findings directly to designated channels.
- Implementation Options:
- Use AWS Lambda with the Slack API to post messages when scanner results are published to S3
- Leverage AWS Chatbot for native Slack integration
- Alert Prioritization: Set thresholds for different urgency levels:
- High priority: Expensive resources with zero utilization (e.g., idle RDS instances)
- Medium priority: Resources with very low utilization
- Low priority: Weekly summary of all identified waste
- Interactive Buttons: Add interactive elements to Slack messages that allow users to:
- Mark resources as "needed" to prevent future alerts
- Schedule automatic shutdown/cleanup for a specific date
- Assign review tasks to team members
- Code Examples: The repository includes a Lambda function that processes scanner results and formats them for Slack notification.
SES Email Digest System
- Automated Report Generation: Send periodic email digests summarizing detected waste and potential savings.
- Email Templates: Create HTML templates with:
- Executive summary at the top
- Cost savings metrics and graphs
- Detailed tables of unused resources by service type
- Resource owner information parsed from tags
- Customization Options:
- Configure different report types per recipient (technical vs. financial)
- Include AWS Cost Explorer links for deeper analysis
- Add calendar links to schedule review meetings
- Compliance Features: Include audit trails and documentation links to meet change management requirements.
- Delivery Options: Configure based on organizational preference:
- Daily digest for DevOps teams
- Weekly summary for team leads
- Monthly executive report for management
GitHub Actions CI/CD Pipeline
- Automated Deployment: Set up GitHub Actions workflows to deploy the scanner to Lambda whenever changes are committed.
- Pipeline Components:
- Unit tests for scanner functions
- Security scanning of dependencies
- Package creation (ZIP or container image)
- Deployment to multiple AWS environments
- Progressive Deployment: Implement a progressive deployment strategy:
- Deploy to development environment first
- Run validation tests
- Automatically progress to production if tests pass
- Version Control: Maintain version tagging for Lambda deployments to enable rollbacks.
- Pull Request Automation: Automatically test and validate PRs with simulated runs against test AWS accounts.
- Infrastructure as Code: Include terraform code for Lambda and required resources.
Multi-Account Terraform Module
- Cross-Account Scanning: Create a Terraform module that enables deploying the scanner across multiple AWS accounts.
- Architecture Components:
- Central monitoring account for aggregating findings
- Cross-account IAM roles with minimal required permissions
- Resource share configurations for consolidated reporting
- Deployment Options:
- Standalone deployment per account
- Hub-and-spoke model with a central reporting account
- Integration with AWS Organizations for automatic enrollment of new accounts
- State Management: Implement remote state storage with proper locking mechanisms.
- Variable Customization: Allow environment-specific configurations through Terraform variables:
- Custom thresholds per account/environment
- Service exclusions for specialized accounts
- Integration points for existing notification systems
- Compliance Features: Built-in compliance checks and guardrails for security requirements.
Conclusion
This script goes beyond what AWS native tools offer, empowering cloud teams to:
- Improve cost visibility
- Take action on idle resources
- Integrate FinOps with engineering workflows
- Automate everything
Additional:
Code of scaner
import os
import boto3
from botocore.exceptions import NoCredentialsError, ClientError
from datetime import datetime, timedelta, timezone
import logging
from colorama import init, Fore, Style
import sys
import json
import time
from functools import wraps
# Global variables
RUN_COUNTER = 0
LANG = os.getenv('LANG', 'UA').upper().split('.')[0] # Extract language code before dot
if LANG not in ['EN', 'UA']: # If language is not supported, default to UA
LANG = 'EN'
# Execution time statistics
execution_times = {}
def measure_time(func):
"""Decorator to measure execution time of functions"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
# Store execution time
execution_times[func.__name__] = execution_time
# Log execution time
logging.info(f"{Fore.CYAN}Function {func.__name__} executed in {execution_time:.2f} seconds{Style.RESET_ALL}")
return result
return wrapper
# Language-specific messages
MESSAGES = {
'EN': {
'log_saved': "Log saved to file: {}",
'logging_error': "Error setting up logging: {}",
'account_id_error': "Error getting account ID: {}",
'region_start': "Starting check in region {} for {}",
'resource_found': "Found unused {}: {} in region {}",
'error_occurred': "Error occurred in region {}: {}",
'check_completed': "Check completed in region {}",
'run_start': "Starting check run #{}",
'run_end': "Check run #{} completed",
'results_saved': "Results saved to file: {}",
'no_instances': "No running instances found in region: {}"
},
'UA': {
'log_saved': "Лог збережено у файл: {}",
'logging_error': "Помилка налаштування логування: {}",
'account_id_error': "Помилка отримання ID акаунту: {}",
'region_start': "Початок перевірки в регіоні {} для {}",
'resource_found': "Знайдено невикористаний {}: {} в регіоні {}",
'error_occurred': "Помилка в регіоні {}: {}",
'check_completed': "Перевірка завершена в регіоні {}",
'run_start': "Початок перевірки #{}",
'run_end': "Перевірка #{} завершена",
'results_saved': "Результати збережено у файл: {}",
'no_instances': "Не знайдено запущених інстансів в регіоні: {}"
}
}
def get_message(key):
"""Get localized message based on LANG setting"""
return MESSAGES[LANG][key]
# Initialize colorama
init()
def get_aws_account_id():
"""Get AWS account ID"""
try:
sts = boto3.client('sts')
return sts.get_caller_identity()['Account']
except Exception as e:
log_error(get_message('account_id_error').format(str(e)))
return "unknown"
def setup_logging():
"""Configure logging with dynamic filename"""
try:
account_id = get_aws_account_id()
current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
log_filename = f"aws_waste_finder_{account_id}_{current_time}.log"
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Save file in logs directory
log_filepath = os.path.join('logs', log_filename)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filepath),
logging.StreamHandler(sys.stdout)
]
)
logging.info(f"{Fore.MAGENTA}{get_message('log_saved').format(log_filepath)}{Style.RESET_ALL}")
except Exception as e:
print(get_message('logging_error').format(str(e)))
# If dynamic filename setup fails, use standard logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('aws_waste_finder.log'),
logging.StreamHandler(sys.stdout)
]
)
# Configure logging
setup_logging()
def log_region_start(region, service):
"""Log start of region check"""
logging.info(f"{Fore.BLUE}{get_message('region_start').format(region, service)}{Style.RESET_ALL}")
def log_unused_resource(resource_type, resource_id, region, details=""):
"""Log found unused resource"""
message = get_message('resource_found').format(resource_type, resource_id, region)
if details:
message += f" ({details})"
logging.info(f"{Fore.YELLOW}{message}{Style.RESET_ALL}")
def log_error(message, region="global"):
"""Log error message"""
if region != "global":
message = get_message('error_occurred').format(region, message)
logging.error(f"{Fore.RED}{message}{Style.RESET_ALL}")
def log_region_end(region):
"""Log end of region check"""
logging.info(f"{Fore.BLUE}{get_message('check_completed').format(region)}{Style.RESET_ALL}")
def log_run_start():
"""Log start of check run"""
logging.info(f"{Fore.MAGENTA}{get_message('run_start').format(RUN_COUNTER)}{Style.RESET_ALL}")
def log_run_end():
"""Log end of check run"""
logging.info(f"{Fore.MAGENTA}{get_message('run_end').format(RUN_COUNTER)}{Style.RESET_ALL}")
def save_results_to_file(results):
"""Save results to JSON file"""
try:
account_id = get_aws_account_id()
current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
# Create results directory if it doesn't exist
os.makedirs('results', exist_ok=True)
# Save file in results directory
filename = f"results/aws_waste_results_{account_id}_{current_time}.json"
# Add execution times to results
results['execution_times'] = execution_times
with open(filename, 'w') as f:
json.dump(results, f, indent=4, default=str)
logging.info(f"{Fore.GREEN}{get_message('results_saved').format(filename)}{Style.RESET_ALL}")
# Log execution time statistics
logging.info(f"{Fore.MAGENTA}Execution time statistics:{Style.RESET_ALL}")
for func_name, exec_time in execution_times.items():
logging.info(f"{Fore.CYAN}{func_name}: {exec_time:.2f} seconds{Style.RESET_ALL}")
except Exception as e:
log_error(f"Error saving results: {str(e)}")
def get_available_regions(service_name):
""" Get available regions for a given service with STS check. """
try:
# Get all regions for the service
session = boto3.session.Session()
regions = session.get_available_regions(service_name)
# For IAM and S3 return only us-east-1 as they are global services
# if service_name in ['iam', 's3']:
# return ['us-east-1']
# For EKS use only regions where it's available
if service_name == 'eks':
eks_regions = [
'us-east-1', 'us-east-2', 'us-west-2',
'eu-west-1', 'eu-central-1', 'eu-north-1',
'ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2'
]
return [region for region in regions if region in eks_regions]
# Get list of available regions through EC2
ec2 = boto3.client('ec2', region_name='us-east-1')
try:
# Get list of regions where EC2 is available
ec2_regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']]
# Filter regions for current service
available_regions = [region for region in regions if region in ec2_regions]
# Check availability of each region
working_regions = []
for region in available_regions:
try:
# Try to create a client for the region
test_client = boto3.client(service_name, region_name=region)
# Execute a simple request to check availability
if service_name == 'ec2':
test_client.describe_regions()
elif service_name == 'rds':
test_client.describe_db_instances(MaxRecords=20) # Change to minimum allowed value
elif service_name == 'lambda':
test_client.list_functions(MaxItems=1)
elif service_name == 'elasticache':
test_client.describe_cache_clusters(MaxRecords=20) # Change to minimum allowed value
elif service_name == 'dynamodb':
test_client.list_tables(Limit=1)
elif service_name == 'kms':
test_client.list_keys(Limit=1)
elif service_name == 'elbv2':
test_client.describe_load_balancers(PageSize=1)
elif service_name == 'acm':
test_client.list_certificates(MaxItems=1)
working_regions.append(region)
logging.info(f"{Fore.GREEN}Region {region} is available for {service_name}{Style.RESET_ALL}")
except ClientError as e:
if e.response['Error']['Code'] == 'AuthFailure':
logging.warning(f"{Fore.YELLOW}Region {region} is not available: AuthFailure{Style.RESET_ALL}")
else:
logging.warning(f"{Fore.YELLOW}Region {region} is not available: {str(e)}{Style.RESET_ALL}")
except Exception as e:
logging.warning(f"{Fore.YELLOW}Region {region} is not available: {str(e)}{Style.RESET_ALL}")
return working_regions
except ClientError as e:
logging.error(f"{Fore.RED}Error getting EC2 regions: {str(e)}{Style.RESET_ALL}")
# If failed to get regions through EC2, return all regions
return regions
except Exception as e:
log_error(f"Error getting regions for {service_name}: {str(e)}", "global")
return []
def get_regions(service_name):
""" Get all available regions for a given service. """
return get_available_regions(service_name)
@measure_time
def find_ec2_instances():
"""Find all EC2 instances and their status"""
ec2_regions = get_regions('ec2')
instances_info = []
for region in ec2_regions:
log_region_start(region, "EC2")
try:
ec2 = boto3.client('ec2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
instances = ec2.describe_instances()
if not instances['Reservations']:
logging.info(f"{Fore.YELLOW}No instances found in region: {region}{Style.RESET_ALL}")
else:
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
state = instance['State']['Name']
instance_info = {
'InstanceId': instance_id,
'Region': region,
'State': state,
'InstanceType': instance.get('InstanceType', 'unknown'),
'LaunchTime': instance.get('LaunchTime', 'unknown'),
'Tags': instance.get('Tags', [])
}
if state == 'running':
# Get CPU metrics
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
if cpu_stats['Datapoints']:
avg_cpu = sum([data_point['Average'] for data_point in cpu_stats['Datapoints']]) / len(cpu_stats['Datapoints'])
instance_info['CPUUtilization'] = round(avg_cpu, 2)
else:
instance_info['CPUUtilization'] = None
instances_info.append(instance_info)
logging.info(f"{Fore.GREEN}EC2 instance: {instance_id} in region {region}, "
f"State: {state}, Type: {instance.get('InstanceType', 'unknown')}, "
f"CPU: {instance_info.get('CPUUtilization', 'N/A')}%{Style.RESET_ALL}")
except ClientError as e:
if e.response['Error']['Code'] == 'AuthFailure':
log_error(f"AuthFailure, skipping...", region)
else:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return instances_info
@measure_time
def find_rds_instances():
"""Find all RDS instances and their status"""
rds_regions = get_regions('rds')
instances_info = []
for region in rds_regions:
log_region_start(region, "RDS")
try:
rds = boto3.client('rds', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
instances = rds.describe_db_instances()
for instance in instances['DBInstances']:
instance_id = instance['DBInstanceIdentifier']
instance_info = {
'DBInstanceIdentifier': instance_id,
'Region': region,
'Status': instance['DBInstanceStatus'],
'Engine': instance.get('Engine', 'unknown'),
'InstanceClass': instance.get('DBInstanceClass', 'unknown'),
'CreationTime': instance.get('InstanceCreateTime', 'unknown')
}
if instance['DBInstanceStatus'] == 'available':
# Get metrics
cpu_stats = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': instance_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
connections = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='DatabaseConnections',
Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': instance_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Average']
)
if cpu_stats['Datapoints']:
avg_cpu = sum([dp['Average'] for dp in cpu_stats['Datapoints']]) / len(cpu_stats['Datapoints'])
instance_info['CPUUtilization'] = round(avg_cpu, 2)
if connections['Datapoints']:
avg_conn = sum([dp['Average'] for dp in connections['Datapoints']]) / len(connections['Datapoints'])
instance_info['AverageConnections'] = round(avg_conn, 2)
instances_info.append(instance_info)
logging.info(f"{Fore.GREEN}RDS instance: {instance_id} in region {region}, "
f"State: {instance['DBInstanceStatus']}, Type: {instance.get('DBInstanceClass', 'unknown')}, "
f"CPU: {instance_info.get('CPUUtilization', 'N/A')}%, "
f"Connections: {instance_info.get('AverageConnections', 'N/A')}{Style.RESET_ALL}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
log_region_end(region)
return instances_info
@measure_time
def find_eks_clusters():
"""Find all EKS clusters and their status"""
eks_regions = get_regions('eks')
clusters_info = []
for region in eks_regions:
log_region_start(region, "EKS")
try:
eks = boto3.client('eks', region_name=region)
clusters = eks.list_clusters()
for cluster_name in clusters['clusters']:
cluster = eks.describe_cluster(name=cluster_name)['cluster']
nodegroups = eks.list_nodegroups(clusterName=cluster_name)
cluster_info = {
'ClusterName': cluster_name,
'Region': region,
'Status': cluster['status'],
'Version': cluster.get('version', 'unknown'),
'CreatedAt': cluster.get('createdAt', 'unknown'),
'NodeGroups': len(nodegroups['nodegroups']) if 'nodegroups' in nodegroups else 0
}
clusters_info.append(cluster_info)
logging.info(f"{Fore.GREEN}EKS cluster: {cluster_name} in region {region}, "
f"Status: {cluster['status']}, Version: {cluster.get('version', 'unknown')}, "
f"Node Groups: {cluster_info['NodeGroups']}{Style.RESET_ALL}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return clusters_info
@measure_time
def find_unused_lambda_functions():
""" Find underutilized or idle Lambda functions. """
lambda_regions = get_regions('lambda')
idle_lambda = []
for region in lambda_regions:
log_region_start(region, "Lambda")
try:
lambda_client = boto3.client('lambda', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
functions = lambda_client.list_functions()
for function in functions['Functions']:
invocations = cloudwatch.get_metric_statistics(
Namespace='AWS/Lambda',
MetricName='Invocations',
Dimensions=[
{
'Name': 'FunctionName',
'Value': function['FunctionName']
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
total_invocations = sum([data_point['Sum'] for data_point in invocations['Datapoints']])
if total_invocations == 0:
idle_lambda.append({'FunctionName': function['FunctionName'], 'Region': region})
log_unused_resource("Lambda function", function['FunctionName'], region)
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return idle_lambda
@measure_time
def find_unused_elasticache_clusters():
""" Find underutilized or idle ElastiCache clusters. """
idle_elasticache = []
elasticache_regions = get_regions('elasticache')
for region in elasticache_regions:
log_region_start(region, "ElastiCache")
try:
elasticache = boto3.client('elasticache', region_name=region)
# Get all cache clusters with max records set to 100
try:
response = elasticache.describe_cache_clusters(MaxRecords=100)
clusters = response.get('CacheClusters', [])
while 'Marker' in response:
response = elasticache.describe_cache_clusters(
MaxRecords=100,
Marker=response['Marker']
)
clusters.extend(response.get('CacheClusters', []))
for cluster in clusters:
cluster_id = cluster['CacheClusterId']
# Get CloudWatch metrics for the cluster
cloudwatch = boto3.client('cloudwatch', region_name=region)
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=30)
# Check CPU utilization
cpu_response = cloudwatch.get_metric_statistics(
Namespace='AWS/ElastiCache',
MetricName='CPUUtilization',
Dimensions=[
{
'Name': 'CacheClusterId',
'Value': cluster_id
}
],
StartTime=start_time,
EndTime=end_time,
Period=86400, # 1 day
Statistics=['Average']
)
# If no CPU data or average CPU < 5%, consider it idle
if not cpu_response['Datapoints'] or \
all(dp['Average'] < 5.0 for dp in cpu_response['Datapoints']):
idle_elasticache.append({
'type': 'ElastiCache',
'id': cluster_id,
'region': region,
'status': cluster.get('CacheClusterStatus', 'unknown'),
'node_type': cluster.get('CacheNodeType', 'unknown'),
'engine': cluster.get('Engine', 'unknown'),
'creation_time': cluster.get('CacheClusterCreateTime', 'unknown')
})
log_unused_resource('ElastiCache', cluster_id, region,
f"Status: {cluster.get('CacheClusterStatus', 'unknown')}, "
f"Node Type: {cluster.get('CacheNodeType', 'unknown')}")
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidClientTokenId':
logging.warning(f"{Fore.YELLOW}Region {region} is not available: InvalidClientTokenId{Style.RESET_ALL}")
continue
elif e.response['Error']['Code'] == 'AuthFailure':
logging.warning(f"{Fore.YELLOW}Region {region} is not available: AuthFailure{Style.RESET_ALL}")
continue
else:
raise
except ClientError as e:
if e.response['Error']['Code'] in ['InvalidClientTokenId', 'AuthFailure']:
logging.warning(f"{Fore.YELLOW}Region {region} is not available: {e.response['Error']['Code']}{Style.RESET_ALL}")
else:
log_error(f"Error checking ElastiCache in {region}: {str(e)}", region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
log_region_end(region)
return idle_elasticache
@measure_time
def find_unused_ec2_snapshots():
""" Find unused EC2 snapshots older than 30 days. """
ec2_regions = get_regions('ec2')
unused_snapshots = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in ec2_regions:
log_region_start(region, "EC2 Snapshots")
try:
ec2 = boto3.client('ec2', region_name=region)
snapshots = ec2.describe_snapshots(OwnerIds=['self'])
for snapshot in snapshots['Snapshots']:
if snapshot['StartTime'] < cutoff_date:
unused_snapshots.append(
{'SnapshotId': snapshot['SnapshotId'], 'Region': region, 'StartTime': snapshot['StartTime']})
log_unused_resource("EC2 snapshot", snapshot['SnapshotId'], region,
f"Created: {snapshot['StartTime'].strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_snapshots
@measure_time
def find_unused_rds_snapshots():
""" Find unused RDS snapshots older than 30 days. """
rds_regions = get_regions('rds')
unused_snapshots = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in rds_regions:
log_region_start(region, "RDS Snapshots")
try:
rds = boto3.client('rds', region_name=region)
snapshots = rds.describe_db_snapshots(SnapshotType='manual')
for snapshot in snapshots['DBSnapshots']:
if snapshot['SnapshotCreateTime'] < cutoff_date:
unused_snapshots.append({'DBSnapshotIdentifier': snapshot['DBSnapshotIdentifier'], 'Region': region,
'SnapshotCreateTime': snapshot['SnapshotCreateTime']})
log_unused_resource("RDS snapshot", snapshot['DBSnapshotIdentifier'], region,
f"Created: {snapshot['SnapshotCreateTime'].strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_snapshots
@measure_time
def find_unused_elasticache_snapshots():
""" Find unused ElastiCache snapshots older than 30 days. """
elasticache_regions = get_regions('elasticache')
unused_snapshots = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in elasticache_regions:
elasticache = boto3.client('elasticache', region_name=region)
snapshots = elasticache.describe_snapshots()
for snapshot in snapshots['Snapshots']:
if 'SnapshotCreateTime' in snapshot and snapshot['SnapshotCreateTime'] < cutoff_date:
unused_snapshots.append({'SnapshotName': snapshot['SnapshotName'], 'Region': region,
'SnapshotCreateTime': snapshot['SnapshotCreateTime']})
return unused_snapshots
@measure_time
def find_unused_kms_keys():
""" Find unused KMS keys. """
kms_regions = get_regions('kms')
unused_keys = []
for region in kms_regions:
log_region_start(region, "KMS")
try:
kms = boto3.client('kms', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
keys = kms.list_keys()
for key in keys['Keys']:
key_id = key['KeyId']
key_info = kms.describe_key(KeyId=key_id)
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/KMS',
MetricName='KeyUsage',
Dimensions=[{'Name': 'KeyId', 'Value': key_id}],
StartTime=datetime.now(timezone.utc) - timedelta(days=30),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_keys.append({
'KeyId': key_id,
'Region': region,
'Description': key_info['KeyMetadata'].get('Description', 'No description')
})
log_unused_resource("KMS key", key_id, region, key_info['KeyMetadata'].get('Description', 'No description'))
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_keys
@measure_time
def find_unused_s3_buckets():
""" Find unused S3 buckets based on activity metrics and last access time. """
unused_buckets = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=90)
log_region_start("global", "S3")
try:
s3 = boto3.client('s3')
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1') # S3 metrics are in us-east-1
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
try:
# Check activity metrics for the last 90 days
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='NumberOfObjects',
Dimensions=[
{
'Name': 'BucketName',
'Value': bucket_name
},
{
'Name': 'StorageType',
'Value': 'AllStorageTypes'
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=90),
EndTime=datetime.now(timezone.utc),
Period=86400, # 1 day
Statistics=['Sum']
)
# Check request metrics
request_metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='AllRequests',
Dimensions=[
{
'Name': 'BucketName',
'Value': bucket_name
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=90),
EndTime=datetime.now(timezone.utc),
Period=86400,
Statistics=['Sum']
)
# Check if bucket is empty
objects = s3.list_objects_v2(Bucket=bucket_name, MaxKeys=1)
is_empty = not objects.get('Contents')
# Check if bucket is not used
no_activity = (
(not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0) and
(not request_metrics['Datapoints'] or sum([dp['Sum'] for dp in request_metrics['Datapoints']]) == 0)
)
if is_empty and no_activity:
unused_buckets.append({
'BucketName': bucket_name,
'CreationDate': bucket['CreationDate'].strftime('%Y-%m-%d'),
'LastActivity': 'No activity in last 90 days'
})
log_unused_resource("S3 bucket", bucket_name, "global",
f"Created: {bucket['CreationDate'].strftime('%Y-%m-%d')}, "
f"No activity in last 90 days")
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
continue
log_error(f"Error checking bucket {bucket_name}: {str(e)}")
except Exception as e:
log_error(f"Unexpected error checking bucket {bucket_name}: {str(e)}")
except ClientError as e:
log_error(str(e))
except Exception as e:
log_error(f"Unknown error: {str(e)}")
return unused_buckets
@measure_time
def find_unused_dynamodb_tables():
""" Find unused DynamoDB tables. """
dynamodb_regions = get_regions('dynamodb')
unused_tables = []
for region in dynamodb_regions:
log_region_start(region, "DynamoDB")
try:
dynamodb = boto3.client('dynamodb', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
tables = dynamodb.list_tables()
for table_name in tables['TableNames']:
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/DynamoDB',
MetricName='ConsumedReadCapacityUnits',
Dimensions=[{'Name': 'TableName', 'Value': table_name}],
StartTime=datetime.now(timezone.utc) - timedelta(days=30),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_tables.append({
'TableName': table_name,
'Region': region
})
log_unused_resource("DynamoDB table", table_name, region)
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_tables
@measure_time
def find_unused_iam_roles():
"""Find IAM roles not used in the last 90 days"""
unused_roles = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=90)
try:
iam = boto3.client('iam')
roles = iam.list_roles()
for role in roles['Roles']:
try:
role_name = role['RoleName']
role_info = iam.get_role(RoleName=role_name)
# Check if role has been used
last_used = role_info['Role'].get('RoleLastUsed', {}).get('LastUsedDate')
if not last_used or last_used < cutoff_date:
unused_roles.append({
'RoleName': role_name,
'LastUsed': last_used.strftime('%Y-%m-%d') if last_used else 'Never'
})
log_unused_resource("IAM role", role_name, "global",
f"Last used: {last_used.strftime('%Y-%m-%d') if last_used else 'Never'}")
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntity':
continue
log_error(f"Error checking role {role_name}: {str(e)}")
except Exception as e:
log_error(f"Unexpected error checking role {role_name}: {str(e)}")
except ClientError as e:
log_error(f"Error listing IAM roles: {str(e)}")
except Exception as e:
log_error(f"Unknown error in find_unused_iam_roles: {str(e)}")
return unused_roles
@measure_time
def find_unused_iam_policies():
""" Find unused IAM policies. """
iam = boto3.client('iam')
unused_policies = []
log_region_start("global", "IAM Policies")
try:
policies = iam.list_policies(Scope='Local')
for policy in policies['Policies']:
policy_arn = policy['Arn']
try:
entities = iam.list_entities_for_policy(PolicyArn=policy_arn)
if not (entities['PolicyGroups'] or entities['PolicyUsers'] or entities['PolicyRoles']):
unused_policies.append({
'PolicyName': policy['PolicyName'],
'PolicyId': policy['PolicyId']
})
log_unused_resource("IAM policy", policy['PolicyName'], "global", f"ID: {policy['PolicyId']}")
except ClientError as e:
log_error(f"Error checking policy {policy['PolicyName']}: {e}")
except ClientError as e:
log_error(str(e))
except Exception as e:
log_error(f"Unknown error: {str(e)}")
return unused_policies
@measure_time
def find_unused_ebs_volumes():
""" Find unused EBS volumes. """
ec2_regions = get_regions('ec2')
unused_volumes = []
for region in ec2_regions:
log_region_start(region, "EBS")
try:
ec2 = boto3.client('ec2', region_name=region)
volumes = ec2.describe_volumes(
Filters=[
{
'Name': 'status',
'Values': ['available']
}
]
)
for volume in volumes['Volumes']:
if volume['State'] == 'available':
unused_volumes.append({
'VolumeId': volume['VolumeId'],
'Region': region,
'Size': volume['Size'],
'CreateTime': volume['CreateTime']
})
log_unused_resource("EBS volume", volume['VolumeId'], region,
f"Size: {volume['Size']}GB, Created: {volume['CreateTime'].strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_volumes
@measure_time
def find_unused_amis():
""" Find unused AMIs older than 30 days. """
ec2_regions = get_regions('ec2')
unused_amis = []
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
for region in ec2_regions:
log_region_start(region, "AMI")
try:
ec2 = boto3.client('ec2', region_name=region)
amis = ec2.describe_images(Owners=['self'])
for ami in amis['Images']:
creation_date = datetime.strptime(ami['CreationDate'], '%Y-%m-%dT%H:%M:%S.%fZ').replace(tzinfo=timezone.utc)
if creation_date < cutoff_date:
unused_amis.append({
'ImageId': ami['ImageId'],
'Region': region,
'Name': ami.get('Name', 'No name'),
'CreationDate': creation_date
})
log_unused_resource("AMI", ami['ImageId'], region,
f"Name: {ami.get('Name', 'No name')}, Created: {creation_date.strftime('%Y-%m-%d')}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_amis
@measure_time
def find_unused_alb():
""" Find unused Application Load Balancers. """
elbv2_regions = get_regions('elbv2')
unused_alb = []
for region in elbv2_regions:
log_region_start(region, "ALB")
try:
elbv2 = boto3.client('elbv2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
load_balancers = elbv2.describe_load_balancers()
for lb in load_balancers['LoadBalancers']:
if lb['Type'] == 'application':
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='RequestCount',
Dimensions=[
{
'Name': 'LoadBalancer',
'Value': lb['LoadBalancerArn'].split('/')[-1]
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_alb.append({
'LoadBalancerName': lb['LoadBalancerName'],
'Region': region,
'DNSName': lb['DNSName']
})
log_unused_resource("ALB", lb['LoadBalancerName'], region, f"DNS: {lb['DNSName']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_alb
@measure_time
def find_unused_target_groups():
""" Find unused Target Groups. """
elbv2_regions = get_regions('elbv2')
unused_target_groups = []
for region in elbv2_regions:
log_region_start(region, "Target Groups")
try:
elbv2 = boto3.client('elbv2', region_name=region)
cloudwatch = boto3.client('cloudwatch', region_name=region)
target_groups = elbv2.describe_target_groups()
for tg in target_groups['TargetGroups']:
metrics = cloudwatch.get_metric_statistics(
Namespace='AWS/ApplicationELB',
MetricName='RequestCount',
Dimensions=[
{
'Name': 'TargetGroup',
'Value': tg['TargetGroupArn'].split('/')[-1]
}
],
StartTime=datetime.now(timezone.utc) - timedelta(days=7),
EndTime=datetime.now(timezone.utc),
Period=3600,
Statistics=['Sum']
)
if not metrics['Datapoints'] or sum([dp['Sum'] for dp in metrics['Datapoints']]) == 0:
unused_target_groups.append({
'TargetGroupName': tg['TargetGroupName'],
'Region': region,
'Protocol': tg['Protocol'],
'Port': tg['Port']
})
log_unused_resource("Target Group", tg['TargetGroupName'], region,
f"Protocol: {tg['Protocol']}, Port: {tg['Port']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_target_groups
@measure_time
def find_unused_acm_certificates():
""" Find unused ACM certificates. """
acm_regions = get_regions('acm')
unused_certificates = []
for region in acm_regions:
log_region_start(region, "ACM")
try:
acm = boto3.client('acm', region_name=region)
certificates = acm.list_certificates()
for cert in certificates['CertificateSummaryList']:
cert_arn = cert['CertificateArn']
cert_detail = acm.describe_certificate(CertificateArn=cert_arn)
# Check if certificate is used
in_use = False
if 'InUseBy' in cert_detail['Certificate'] and cert_detail['Certificate']['InUseBy']:
in_use = True
if not in_use:
unused_certificates.append({
'CertificateArn': cert_arn,
'Region': region,
'DomainName': cert['DomainName'],
'Status': cert_detail['Certificate']['Status']
})
log_unused_resource("ACM certificate", cert['DomainName'], region,
f"Status: {cert_detail['Certificate']['Status']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_certificates
@measure_time
def find_unused_vpc_resources():
""" Find unused VPC resources. """
ec2_regions = get_regions('ec2')
unused_resources = {
'vpc': [],
'subnets': [],
'security_groups': [],
'network_interfaces': []
}
for region in ec2_regions:
log_region_start(region, "VPC")
try:
ec2 = boto3.client('ec2', region_name=region)
# Get list of default VPCs
default_vpcs = []
vpcs = ec2.describe_vpcs()
for vpc in vpcs['Vpcs']:
if vpc['CidrBlock'] == '172.31.0.0/16' or vpc['IsDefault']:
default_vpcs.append(vpc['VpcId'])
# Check VPC
for vpc in vpcs['Vpcs']:
vpc_id = vpc['VpcId']
# Skip default VPCs
if vpc_id in default_vpcs:
continue
# Check if VPC is used
instances = ec2.describe_instances(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}])
if not instances['Reservations']:
unused_resources['vpc'].append({
'VpcId': vpc_id,
'Region': region,
'CidrBlock': vpc['CidrBlock']
})
log_unused_resource("VPC", vpc_id, region, f"CIDR: {vpc['CidrBlock']}")
# Check subnet
subnets = ec2.describe_subnets()
for subnet in subnets['Subnets']:
subnet_id = subnet['SubnetId']
# Skip subnets in default VPCs
if subnet['VpcId'] in default_vpcs:
continue
# Check if subnet is used
instances = ec2.describe_instances(Filters=[{'Name': 'subnet-id', 'Values': [subnet_id]}])
if not instances['Reservations']:
unused_resources['subnets'].append({
'SubnetId': subnet_id,
'Region': region,
'CidrBlock': subnet['CidrBlock']
})
log_unused_resource("Subnet", subnet_id, region, f"CIDR: {subnet['CidrBlock']}")
# Check security groups
security_groups = ec2.describe_security_groups()
for sg in security_groups['SecurityGroups']:
sg_id = sg['GroupId']
# Skip default security groups
if sg['GroupName'] == 'default' or sg['VpcId'] in default_vpcs:
continue
# Check if security group is used
instances = ec2.describe_instances(Filters=[{'Name': 'instance.group-id', 'Values': [sg_id]}])
if not instances['Reservations']:
unused_resources['security_groups'].append({
'GroupId': sg_id,
'Region': region,
'GroupName': sg['GroupName']
})
log_unused_resource("Security Group", sg_id, region, f"Name: {sg['GroupName']}")
# Check network interfaces
network_interfaces = ec2.describe_network_interfaces()
for ni in network_interfaces['NetworkInterfaces']:
# Skip network interfaces in default VPCs
if ni['VpcId'] in default_vpcs:
continue
if ni['Status'] == 'available':
unused_resources['network_interfaces'].append({
'NetworkInterfaceId': ni['NetworkInterfaceId'],
'Region': region,
'SubnetId': ni['SubnetId']
})
log_unused_resource("Network Interface", ni['NetworkInterfaceId'], region,
f"Subnet: {ni['SubnetId']}")
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_resources
@measure_time
def find_unused_ec2_resources():
""" Find unused EC2 resources. """
ec2_regions = get_regions('ec2')
unused_resources = {
'elastic_ips': [],
'placement_groups': [],
'dedicated_hosts': []
}
for region in ec2_regions:
log_region_start(region, "EC2 Resources")
try:
ec2 = boto3.client('ec2', region_name=region)
# Check Elastic IPs
addresses = ec2.describe_addresses()
for address in addresses['Addresses']:
if 'InstanceId' not in address:
unused_resources['elastic_ips'].append({
'AllocationId': address['AllocationId'],
'Region': region,
'PublicIp': address['PublicIp']
})
log_unused_resource("Elastic IP", address['AllocationId'], region,
f"IP: {address['PublicIp']}")
# Check Placement Groups
placement_groups = ec2.describe_placement_groups()
for pg in placement_groups['PlacementGroups']:
instances = ec2.describe_instances(Filters=[{'Name': 'placement-group-name', 'Values': [pg['GroupName']]}])
if not instances['Reservations']:
unused_resources['placement_groups'].append({
'GroupName': pg['GroupName'],
'Region': region,
'Strategy': pg['Strategy']
})
log_unused_resource("Placement Group", pg['GroupName'], region,
f"Strategy: {pg['Strategy']}")
# Check Dedicated Hosts
try:
hosts = ec2.describe_hosts()
for host in hosts['Hosts']:
if host['State'] == 'available' and not host.get('Instances'):
unused_resources['dedicated_hosts'].append({
'HostId': host['HostId'],
'Region': region,
'InstanceFamily': host.get('HostProperties', {}).get('InstanceFamily', 'unknown')
})
log_unused_resource("Dedicated Host", host['HostId'], region,
f"Family: {host.get('HostProperties', {}).get('InstanceFamily', 'unknown')}")
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidParameterValue':
logging.warning(f"{Fore.YELLOW}Dedicated Hosts not supported in region {region}{Style.RESET_ALL}")
else:
raise
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
return unused_resources
@measure_time
def find_unused_iam_resources():
""" Find unused IAM resources. """
iam = boto3.client('iam')
unused_resources = {
'users': [],
'groups': [],
'access_keys': []
}
log_region_start("global", "IAM Resources")
try:
# Check IAM Users
users = iam.list_users()
for user in users['Users']:
user_name = user['UserName']
last_used = iam.get_user(UserName=user_name)['User'].get('PasswordLastUsed')
if not last_used or (datetime.now(timezone.utc) - last_used.replace(tzinfo=timezone.utc)).days > 90:
unused_resources['users'].append({
'UserName': user_name,
'LastUsed': last_used.strftime('%Y-%m-%d') if last_used else 'Never'
})
log_unused_resource("IAM User", user_name, "global",
f"Last used: {last_used.strftime('%Y-%m-%d') if last_used else 'Never'}")
# Check IAM Groups
groups = iam.list_groups()
for group in groups['Groups']:
group_name = group['GroupName']
users = iam.get_group(GroupName=group_name)
if not users['Users']:
unused_resources['groups'].append({
'GroupName': group_name,
'CreateDate': group['CreateDate'].strftime('%Y-%m-%d')
})
log_unused_resource("IAM Group", group_name, "global",
f"Created: {group['CreateDate'].strftime('%Y-%m-%d')}")
# Check Access Keys
for user in users['Users']:
access_keys = iam.list_access_keys(UserName=user['UserName'])
for key in access_keys['AccessKeyMetadata']:
last_used = iam.get_access_key_last_used(AccessKeyId=key['AccessKeyId'])
if not last_used['AccessKeyLastUsed'].get('LastUsedDate'):
unused_resources['access_keys'].append({
'AccessKeyId': key['AccessKeyId'],
'UserName': user['UserName'],
'CreateDate': key['CreateDate'].strftime('%Y-%m-%d')
})
log_unused_resource("Access Key", key['AccessKeyId'], "global",
f"User: {user['UserName']}")
except ClientError as e:
log_error(str(e))
except Exception as e:
log_error(f"Unknown error: {str(e)}")
return unused_resources
@measure_time
def find_unused_ecr_repositories():
""" Find unused ECR repositories. """
ecr_regions = get_regions('ecr')
unused_repositories = []
for region in ecr_regions:
log_region_start(region, "ECR")
try:
ecr = boto3.client('ecr', region_name=region)
repositories = ecr.describe_repositories()
for repo in repositories['repositories']:
repo_name = repo['repositoryName']
# Get image details
try:
images = ecr.describe_images(repositoryName=repo_name)
# Check if repository has any images
if not images['imageDetails']:
unused_repositories.append({
'RepositoryName': repo_name,
'Region': region,
'CreatedAt': repo['createdAt'].strftime('%Y-%m-%d')
})
log_unused_resource("ECR repository", repo_name, region,
f"Created: {repo['createdAt'].strftime('%Y-%m-%d')}")
else:
# Check last image push time
last_push = max(img['imagePushedAt'] for img in images['imageDetails'])
if (datetime.now(timezone.utc) - last_push.replace(tzinfo=timezone.utc)).days > 90:
unused_repositories.append({
'RepositoryName': repo_name,
'Region': region,
'LastPush': last_push.strftime('%Y-%m-%d'),
'ImageCount': len(images['imageDetails'])
})
log_unused_resource("ECR repository", repo_name, region,
f"Last push: {last_push.strftime('%Y-%m-%d')}, "
f"Images: {len(images['imageDetails'])}")
except ecr.exceptions.RepositoryNotFoundException:
continue
except Exception as e:
log_error(f"Error checking repository {repo_name}: {str(e)}", region)
except ClientError as e:
log_error(str(e), region)
except Exception as e:
log_error(f"Unknown error: {str(e)}", region)
log_region_end(region)
return unused_repositories
if __name__ == "__main__":
RUN_COUNTER += 1
log_run_start()
logging.info(f"{Fore.MAGENTA}Starting AWS resources check{Style.RESET_ALL}")
start_time = time.time()
try:
all_results = {
'ec2_instances': find_ec2_instances(),
'rds_instances': find_rds_instances(),
'eks_clusters': find_eks_clusters(),
'lambda_functions': find_unused_lambda_functions(),
'elasticache_clusters': find_unused_elasticache_clusters(),
'ec2_snapshots': find_unused_ec2_snapshots(),
'rds_snapshots': find_unused_rds_snapshots(),
'elasticache_snapshots': find_unused_elasticache_snapshots(),
'kms_keys': find_unused_kms_keys(),
's3_buckets': find_unused_s3_buckets(),
'dynamodb_tables': find_unused_dynamodb_tables(),
'iam_roles': find_unused_iam_roles(),
'iam_policies': find_unused_iam_policies(),
'ebs_volumes': find_unused_ebs_volumes(),
'amis': find_unused_amis(),
'alb': find_unused_alb(),
'target_groups': find_unused_target_groups(),
'acm_certificates': find_unused_acm_certificates(),
'vpc_resources': find_unused_vpc_resources(),
'ec2_resources': find_unused_ec2_resources(),
'iam_resources': find_unused_iam_resources(),
'ecr_repositories': find_unused_ecr_repositories()
}
save_results_to_file(all_results)
total_resources = sum(len(v) if isinstance(v, list) else sum(len(x) for x in v.values() if isinstance(x, list)) for v in all_results.values())
total_execution_time = time.time() - start_time
hours, remainder = divmod(total_execution_time, 3600)
minutes, seconds = divmod(remainder, 60)
logging.info(f"{Fore.MAGENTA}Found total of {total_resources} resources{Style.RESET_ALL}")
logging.info(f"{Fore.MAGENTA}Total execution time: {int(hours)}h {int(minutes)}m {seconds:.2f}s{Style.RESET_ALL}")
logging.info(f"{Fore.MAGENTA}Execution time statistics:{Style.RESET_ALL}")
for func_name, exec_time in execution_times.items():
logging.info(f"{Fore.CYAN}{func_name}: {exec_time:.2f} seconds{Style.RESET_ALL}")
except NoCredentialsError:
log_error("AWS credentials not found. Please configure your AWS credentials.")
except ClientError as e:
log_error(f"AWS API error: {e}")
except Exception as e:
log_error(f"Unknown error: {e}")
finally:
log_run_end()
Code Optimization Opportunities
The code provided in the article was written quite some time ago and has certain shortcomings. While the scanner code is functional, there are several optimization opportunities that could significantly improve its performance, scalability, and maintainability. This section outlines key areas for improvement with code examples. I’d be glad to hear your feedback and hope that my article will be useful to you.
Parallel Processing
The current implementation scans each region and service sequentially, which can lead to long execution times. Using parallel processing can dramatically improve performance:
import concurrent.futures
def scan_all_regions(service, scan_function):
"""Scan regions in parallel using ThreadPoolExecutor"""
regions = get_regions(service)
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_region = {executor.submit(scan_function, region): region for region in regions}
for future in concurrent.futures.as_completed(future_to_region):
region = future_to_region[future]
try:
result = future.result()
results.extend(result)
except Exception as e:
log_error(f"Error in {region}: {str(e)}", region)
return results
Efficient CloudWatch Queries
Replace multiple individual get_metric_statistics
calls with batch get_metric_data
requests:
def get_multiple_metrics(cloudwatch, namespace, metric_names, dimensions, days=7):
"""Get multiple CloudWatch metrics in a single API call"""
end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(days=days)
queries = []
for i, metric_name in enumerate(metric_names):
queries.append({
'Id': f'metric{i}',
'MetricStat': {
'Metric': {
'Namespace': namespace,
'MetricName': metric_name,
'Dimensions': dimensions
},
'Period': 3600,
'Stat': 'Average'
},
'ReturnData': True
})
return cloudwatch.get_metric_data(
MetricDataQueries=queries,
StartTime=start_time,
EndTime=end_time
)
Proper Pagination
Ensure all API calls handle pagination correctly:
def list_all_instances(region):
"""List all EC2 instances with proper pagination"""
ec2 = boto3.client('ec2', region_name=region)
paginator = ec2.get_paginator('describe_instances')
instances = []
for page in paginator.paginate():
for reservation in page['Reservations']:
instances.extend(reservation['Instances'])
return instances
Object-Oriented Approach
Refactor the code to use classes for better organization and reusability:
class ResourceScanner:
"""Base scanner class with common functionality"""
def __init__(self, region):
self.region = region
def scan(self):
raise NotImplementedError("Subclasses must implement scan method")
class EC2Scanner(ResourceScanner):
"""Scanner for EC2 resources"""
def scan(self):
# EC2 specific scanning logic
pass
class RDSScanner(ResourceScanner):
"""Scanner for RDS resources"""
def scan(self):
# RDS specific scanning logic
pass
Rate Limiting and Error Handling
Implement rate limiting to avoid API throttling:
import time
from functools import wraps
def rate_limited(max_per_second):
"""Decorator to limit the rate of API calls"""
min_interval = 1.0 / max_per_second
last_time_called = [0.0]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_time_called[0]
to_sleep = min_interval - elapsed
if to_sleep > 0:
time.sleep(to_sleep)
result = func(*args, **kwargs)
last_time_called[0] = time.time()
return result
return wrapper
return decorator
@rate_limited(5) # Maximum 5 calls per second
def describe_instances(client):
return client.describe_instances()
Caching and Memoization
Implement caching to avoid duplicate API calls:
import functools
@functools.lru_cache(maxsize=128)
def get_service_regions(service_name):
"""Cache available regions for each service"""
session = boto3.session.Session()
return session.get_available_regions(service_name)
Separation of Concerns
Separate business logic from presentation:
# Logger class to handle all output formatting
class ColorLogger:
def __init__(self, log_level=logging.INFO):
self.logger = logging.getLogger('scanner')
self.logger.setLevel(log_level)
def info(self, message, color=None):
if color:
self.logger.info(f"{color}{message}{Style.RESET_ALL}")
else:
self.logger.info(message)
def error(self, message):
self.logger.error(f"{Fore.RED}{message}{Style.RESET_ALL}")
Memory Optimization
For large AWS environments, implement streaming processing to avoid memory issues:
def process_resources_in_batches(resources, batch_size=100):
"""Process large collections in manageable batches"""
for i in range(0, len(resources), batch_size):
batch = resources[i:i+batch_size]
yield batch
Implementing these optimizations would significantly improve the scanner's performance, especially in large AWS environments with multiple regions and thousands of resources.