Trending-skills ministack-aws-emulator
MiniStack is a free, open-source local AWS emulator (LocalStack replacement) that emulates 25+ AWS services on a single port with no account or license required.
install
source · Clone the upstream repo
git clone https://github.com/Aradotso/trending-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/Aradotso/trending-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/ministack-aws-emulator" ~/.claude/skills/aradotso-trending-skills-ministack-aws-emulator && rm -rf "$T"
manifest:
skills/ministack-aws-emulator/SKILL.mdsource content
MiniStack AWS Emulator
Skill by ara.so — Daily 2026 Skills collection.
MiniStack is a free, MIT-licensed drop-in replacement for LocalStack that emulates 25+ AWS services (S3, SQS, DynamoDB, Lambda, SNS, IAM, STS, Kinesis, EventBridge, SecretsManager, SSM, CloudWatch, SES, and more) on a single port (
4566). No account, no API key, no telemetry. Works with boto3, AWS CLI, Terraform, CDK, and any SDK.
Installation
Option 1: PyPI (simplest)
pip install ministack ministack # Server runs at http://localhost:4566 # Change port: GATEWAY_PORT=5000 ministack
Option 2: Docker Hub
docker run -p 4566:4566 nahuelnucera/ministack
Option 3: Docker Compose (from source)
git clone https://github.com/Nahuel990/ministack cd ministack docker compose up -d
Verify it's running
curl http://localhost:4566/_localstack/health
Configuration
| Environment Variable | Default | Description |
|---|---|---|
| | Port to listen on |
| | Set to to persist S3 data to disk |
AWS CLI Usage
# Set credentials (any non-empty values work) export AWS_ACCESS_KEY_ID=test export AWS_SECRET_ACCESS_KEY=test export AWS_DEFAULT_REGION=us-east-1 # S3 aws --endpoint-url=http://localhost:4566 s3 mb s3://my-bucket aws --endpoint-url=http://localhost:4566 s3 cp ./file.txt s3://my-bucket/ aws --endpoint-url=http://localhost:4566 s3 ls s3://my-bucket # SQS aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name my-queue aws --endpoint-url=http://localhost:4566 sqs list-queues # DynamoDB aws --endpoint-url=http://localhost:4566 dynamodb list-tables aws --endpoint-url=http://localhost:4566 dynamodb create-table \ --table-name Users \ --attribute-definitions AttributeName=userId,AttributeType=S \ --key-schema AttributeName=userId,KeyType=HASH \ --billing-mode PAY_PER_REQUEST # STS (identity check) aws --endpoint-url=http://localhost:4566 sts get-caller-identity # Use a named profile instead aws configure --profile local # Enter: test / test / us-east-1 / json aws --profile local --endpoint-url=http://localhost:4566 s3 ls
awslocal wrapper (from source)
chmod +x bin/awslocal ./bin/awslocal s3 ls ./bin/awslocal dynamodb list-tables
boto3 Usage Patterns
Universal client factory
import boto3 ENDPOINT = "http://localhost:4566" def aws_client(service: str): return boto3.client( service, endpoint_url=ENDPOINT, aws_access_key_id="test", aws_secret_access_key="test", region_name="us-east-1", ) def aws_resource(service: str): return boto3.resource( service, endpoint_url=ENDPOINT, aws_access_key_id="test", aws_secret_access_key="test", region_name="us-east-1", )
S3
s3 = aws_client("s3") # Create bucket and upload s3.create_bucket(Bucket="my-bucket") s3.put_object(Bucket="my-bucket", Key="hello.txt", Body=b"Hello, MiniStack!") # Download obj = s3.get_object(Bucket="my-bucket", Key="hello.txt") print(obj["Body"].read()) # b'Hello, MiniStack!' # List objects response = s3.list_objects_v2(Bucket="my-bucket") for item in response.get("Contents", []): print(item["Key"]) # Copy object s3.copy_object( Bucket="my-bucket", CopySource={"Bucket": "my-bucket", "Key": "hello.txt"}, Key="hello-copy.txt", ) # Enable versioning s3.put_bucket_versioning( Bucket="my-bucket", VersioningConfiguration={"Status": "Enabled"}, ) # Presigned URL (works locally) url = s3.generate_presigned_url( "get_object", Params={"Bucket": "my-bucket", "Key": "hello.txt"}, ExpiresIn=3600, )
SQS
sqs = aws_client("sqs") # Standard queue queue = sqs.create_queue(QueueName="my-queue") queue_url = queue["QueueUrl"] sqs.send_message(QueueUrl=queue_url, MessageBody='{"event": "user_signup"}') messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10) for msg in messages.get("Messages", []): print(msg["Body"]) sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=msg["ReceiptHandle"]) # FIFO queue fifo = sqs.create_queue( QueueName="my-queue.fifo", Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"}, ) # Dead-letter queue setup dlq = sqs.create_queue(QueueName="my-dlq") dlq_attrs = sqs.get_queue_attributes( QueueUrl=dlq["QueueUrl"], AttributeNames=["QueueArn"] ) sqs.set_queue_attributes( QueueUrl=queue_url, Attributes={ "RedrivePolicy": json.dumps({ "deadLetterTargetArn": dlq_attrs["Attributes"]["QueueArn"], "maxReceiveCount": "3", }) }, )
DynamoDB
import json ddb = aws_client("dynamodb") # Create table ddb.create_table( TableName="Users", KeySchema=[ {"AttributeName": "userId", "KeyType": "HASH"}, {"AttributeName": "createdAt", "KeyType": "RANGE"}, ], AttributeDefinitions=[ {"AttributeName": "userId", "AttributeType": "S"}, {"AttributeName": "createdAt", "AttributeType": "N"}, ], BillingMode="PAY_PER_REQUEST", ) # Put / Get / Delete ddb.put_item( TableName="Users", Item={ "userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}, "name": {"S": "Alice"}, "active": {"BOOL": True}, }, ) item = ddb.get_item( TableName="Users", Key={"userId": {"S": "u1"}, "createdAt": {"N": "1700000000"}}, ) print(item["Item"]["name"]["S"]) # Alice # Query result = ddb.query( TableName="Users", KeyConditionExpression="userId = :uid", ExpressionAttributeValues={":uid": {"S": "u1"}}, ) # Batch write ddb.batch_write_item( RequestItems={ "Users": [ {"PutRequest": {"Item": {"userId": {"S": "u2"}, "createdAt": {"N": "1700000001"}, "name": {"S": "Bob"}}}}, ] } ) # TTL ddb.update_time_to_live( TableName="Users", TimeToLiveSpecification={"Enabled": True, "AttributeName": "expiresAt"}, )
SNS + SQS Fanout
sns = aws_client("sns") sqs = aws_client("sqs") topic = sns.create_topic(Name="my-topic") topic_arn = topic["TopicArn"] queue = sqs.create_queue(QueueName="fan-queue") queue_attrs = sqs.get_queue_attributes( QueueUrl=queue["QueueUrl"], AttributeNames=["QueueArn"] ) queue_arn = queue_attrs["Attributes"]["QueueArn"] sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn) # Publish — message is fanned out to subscribed SQS queues sns.publish(TopicArn=topic_arn, Message="hello fanout", Subject="test")
Lambda
import zipfile, io # Create a zip with handler code buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("handler.py", """ def handler(event, context): print("event:", event) return {"statusCode": 200, "body": "ok"} """) buf.seek(0) lam = aws_client("lambda") lam.create_function( FunctionName="my-function", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/role", Handler="handler.handler", Code={"ZipFile": buf.read()}, ) # Invoke synchronously import json response = lam.invoke( FunctionName="my-function", InvocationType="RequestResponse", Payload=json.dumps({"key": "value"}), ) result = json.loads(response["Payload"].read()) print(result) # {"statusCode": 200, "body": "ok"} # SQS event source mapping lam.create_event_source_mapping( EventSourceArn=queue_arn, FunctionName="my-function", BatchSize=10, Enabled=True, )
SecretsManager
sm = aws_client("secretsmanager") sm.create_secret(Name="db-password", SecretString='{"password":"s3cr3t"}') secret = sm.get_secret_value(SecretId="db-password") print(secret["SecretString"]) # {"password":"s3cr3t"} sm.update_secret(SecretId="db-password", SecretString='{"password":"newpass"}') sm.delete_secret(SecretId="db-password", ForceDeleteWithoutRecovery=True)
SSM Parameter Store
ssm = aws_client("ssm") ssm.put_parameter(Name="/app/db/host", Value="localhost", Type="String") ssm.put_parameter(Name="/app/db/password", Value="secret", Type="SecureString") param = ssm.get_parameter(Name="/app/db/host") print(param["Parameter"]["Value"]) # localhost # Fetch all params under a path params = ssm.get_parameters_by_path(Path="/app/", Recursive=True) for p in params["Parameters"]: print(p["Name"], p["Value"])
Kinesis
import base64 kin = aws_client("kinesis") kin.create_stream(StreamName="events", ShardCount=1) kin.put_record(StreamName="events", Data=b'{"event":"click"}', PartitionKey="user1") # Get records shards = kin.list_shards(StreamName="events") shard_id = shards["Shards"][0]["ShardId"] iterator = kin.get_shard_iterator( StreamName="events", ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) records = kin.get_records(ShardIterator=iterator["ShardIterator"]) for r in records["Records"]: print(base64.b64decode(r["Data"]))
EventBridge
eb = aws_client("events") # Create a custom bus eb.create_event_bus(Name="my-bus") # Put a rule targeting a Lambda eb.put_rule( Name="my-rule", EventBusName="my-bus", EventPattern='{"source": ["myapp"]}', State="ENABLED", ) eb.put_targets( Rule="my-rule", EventBusName="my-bus", Targets=[{"Id": "1", "Arn": "arn:aws:lambda:us-east-1:000000000000:function:my-function"}], ) # Emit an event (triggers Lambda target) eb.put_events(Entries=[{ "Source": "myapp", "DetailType": "UserSignup", "Detail": '{"userId": "123"}', "EventBusName": "my-bus", }])
CloudWatch Logs
import time logs = aws_client("logs") logs.create_log_group(logGroupName="/app/service") logs.create_log_stream(logGroupName="/app/service", logStreamName="stream-1") logs.put_log_events( logGroupName="/app/service", logStreamName="stream-1", logEvents=[ {"timestamp": int(time.time() * 1000), "message": "App started"}, {"timestamp": int(time.time() * 1000), "message": "Request received"}, ], ) events = logs.get_log_events( logGroupName="/app/service", logStreamName="stream-1", ) for e in events["events"]: print(e["message"]) # Filter with glob patterns (* and ?), AND terms, -exclusions filtered = logs.filter_log_events( logGroupName="/app/service", filterPattern="Request*", )
Testing Patterns
pytest fixture (recommended)
import pytest import boto3 MINISTACK_ENDPOINT = "http://localhost:4566" @pytest.fixture(scope="session") def aws_endpoint(): return MINISTACK_ENDPOINT @pytest.fixture def s3_client(aws_endpoint): return boto3.client( "s3", endpoint_url=aws_endpoint, aws_access_key_id="test", aws_secret_access_key="test", region_name="us-east-1", ) @pytest.fixture def test_bucket(s3_client): bucket = "test-bucket" s3_client.create_bucket(Bucket=bucket) yield bucket # Cleanup objs = s3_client.list_objects_v2(Bucket=bucket).get("Contents", []) for obj in objs: s3_client.delete_object(Bucket=bucket, Key=obj["Key"]) s3_client.delete_bucket(Bucket=bucket) def test_upload_download(s3_client, test_bucket): s3_client.put_object(Bucket=test_bucket, Key="test.txt", Body=b"hello") resp = s3_client.get_object(Bucket=test_bucket, Key="test.txt") assert resp["Body"].read() == b"hello"
GitHub Actions CI integration
# .github/workflows/test.yml name: Test on: [push, pull_request] jobs: test: runs-on: ubuntu-latest services: ministack: image: nahuelnucera/ministack ports: - 4566:4566 steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.12" - run: pip install -r requirements.txt - run: pytest env: AWS_ACCESS_KEY_ID: test AWS_SECRET_ACCESS_KEY: test AWS_DEFAULT_REGION: us-east-1 AWS_ENDPOINT_URL: http://localhost:4566
Using AWS_ENDPOINT_URL env var (boto3 >= 1.28)
import os import boto3 # If AWS_ENDPOINT_URL is set, boto3 uses it automatically — no endpoint_url kwarg needed # export AWS_ENDPOINT_URL=http://localhost:4566 s3 = boto3.client("s3") # picks up AWS_ENDPOINT_URL automatically
Supported Services (25+)
| Service | Key Operations |
|---|---|
| S3 | CRUD, multipart, versioning, encryption, lifecycle, CORS, ACL, notifications |
| SQS | Standard & FIFO queues, DLQ, batch ops |
| SNS | Topics, subscriptions, fanout to SQS/Lambda, platform endpoints |
| DynamoDB | Tables, CRUD, Query, Scan, TTL, transactions, batch ops |
| Lambda | Python runtimes, invoke, SQS event sources, Function URLs |
| IAM | Users, roles, policies, groups, instance profiles, OIDC |
| STS | GetCallerIdentity, AssumeRole, GetSessionToken |
| SecretsManager | Full CRUD, rotation, versioning |
| SSM Parameter Store | String, SecureString, StringList, path queries |
| EventBridge | Buses, rules, targets, Lambda dispatch |
| Kinesis | Streams, shards, records, iterators |
| CloudWatch Metrics | PutMetricData, alarms, dashboards, CBOR protocol |
| CloudWatch Logs | Log groups/streams, filter with globs, metric filters |
| SES | Send email, templates, configuration sets |
| Step Functions | State machine CRUD |
| RDS | Spins up real Postgres/MySQL containers |
| ElastiCache | Spins up real Redis containers |
| Athena | Real SQL via DuckDB |
| ECS | Real Docker containers |
Troubleshooting
Connection refused on port 4566
# Check if ministack is running curl http://localhost:4566/_localstack/health # Start it ministack # or docker run -p 4566:4566 nahuelnucera/ministack
from boto3NoCredentialsError
export AWS_ACCESS_KEY_ID=test export AWS_SECRET_ACCESS_KEY=test export AWS_DEFAULT_REGION=us-east-1 # Any non-empty values work — MiniStack doesn't validate credentials
InvalidSignatureException
- This is usually a region mismatch. Ensure
matches across all clients.region_name="us-east-1"
Lambda function not found after create
- MiniStack executes Python runtimes with a warm worker pool. Wait briefly or invoke with
for async.InvocationType="Event"
S3 data lost on restart
# Enable persistence S3_PERSIST=1 ministack # or in Docker docker run -p 4566:4566 -e S3_PERSIST=1 -v $(pwd)/data:/data nahuelnucera/ministack
Port conflict
GATEWAY_PORT=5000 ministack # Then use http://localhost:5000 as endpoint
Migrating from LocalStack
- Replace all
endpoint URLs — they stay the same.http://localhost:4566 - Remove
/LOCALSTACK_AUTH_TOKEN
env vars (not needed).LOCALSTACK_API_KEY - Replace
Docker image withlocalstack/localstack
.nahuelnucera/ministack - All
client code works without modification.boto3