Antigravity-awesome-skills azure-servicebus-py
Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.
install
source · Clone the upstream repo
git clone https://github.com/sickn33/antigravity-awesome-skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/sickn33/antigravity-awesome-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/plugins/antigravity-awesome-skills-claude/skills/azure-servicebus-py" ~/.claude/skills/sickn33-antigravity-awesome-skills-azure-servicebus-py && rm -rf "$T"
manifest:
plugins/antigravity-awesome-skills-claude/skills/azure-servicebus-py/SKILL.mdsafety · automated scan (low risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
- pip install
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content
Azure Service Bus SDK for Python
Enterprise messaging for reliable cloud communication with queues and pub/sub topics.
Installation
pip install azure-servicebus azure-identity
Environment Variables
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net SERVICEBUS_QUEUE_NAME=myqueue SERVICEBUS_TOPIC_NAME=mytopic SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
Authentication
from azure.identity import DefaultAzureCredential from azure.servicebus import ServiceBusClient credential = DefaultAzureCredential() namespace = "<namespace>.servicebus.windows.net" client = ServiceBusClient( fully_qualified_namespace=namespace, credential=credential )
Client Types
| Client | Purpose | Get From |
|---|---|---|
| Connection management | Direct instantiation |
| Send messages | / |
| Receive messages | / |
Send Messages (Async)
import asyncio from azure.servicebus.aio import ServiceBusClient from azure.servicebus import ServiceBusMessage from azure.identity.aio import DefaultAzureCredential async def send_messages(): credential = DefaultAzureCredential() async with ServiceBusClient( fully_qualified_namespace="<namespace>.servicebus.windows.net", credential=credential ) as client: sender = client.get_queue_sender(queue_name="myqueue") async with sender: # Single message message = ServiceBusMessage("Hello, Service Bus!") await sender.send_messages(message) # Batch of messages messages = [ServiceBusMessage(f"Message {i}") for i in range(10)] await sender.send_messages(messages) # Message batch (for size control) batch = await sender.create_message_batch() for i in range(100): try: batch.add_message(ServiceBusMessage(f"Batch message {i}")) except ValueError: # Batch full await sender.send_messages(batch) batch = await sender.create_message_batch() batch.add_message(ServiceBusMessage(f"Batch message {i}")) await sender.send_messages(batch) asyncio.run(send_messages())
Receive Messages (Async)
async def receive_messages(): credential = DefaultAzureCredential() async with ServiceBusClient( fully_qualified_namespace="<namespace>.servicebus.windows.net", credential=credential ) as client: receiver = client.get_queue_receiver(queue_name="myqueue") async with receiver: # Receive batch messages = await receiver.receive_messages( max_message_count=10, max_wait_time=5 # seconds ) for msg in messages: print(f"Received: {str(msg)}") await receiver.complete_message(msg) # Remove from queue asyncio.run(receive_messages())
Receive Modes
| Mode | Behavior | Use Case |
|---|---|---|
(default) | Message locked, must complete/abandon | Reliable processing |
| Removed immediately on receive | At-most-once delivery |
from azure.servicebus import ServiceBusReceiveMode receiver = client.get_queue_receiver( queue_name="myqueue", receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE )
Message Settlement
async with receiver: messages = await receiver.receive_messages(max_message_count=1) for msg in messages: try: # Process message... await receiver.complete_message(msg) # Success - remove from queue except ProcessingError: await receiver.abandon_message(msg) # Retry later except PermanentError: await receiver.dead_letter_message( msg, reason="ProcessingFailed", error_description="Could not process" )
| Action | Effect |
|---|---|
| Remove from queue (success) |
| Release lock, retry immediately |
| Move to dead-letter queue |
| Set aside, receive by sequence number |
Topics and Subscriptions
# Send to topic sender = client.get_topic_sender(topic_name="mytopic") async with sender: await sender.send_messages(ServiceBusMessage("Topic message")) # Receive from subscription receiver = client.get_subscription_receiver( topic_name="mytopic", subscription_name="mysubscription" ) async with receiver: messages = await receiver.receive_messages(max_message_count=10)
Sessions (FIFO)
# Send with session message = ServiceBusMessage("Session message") message.session_id = "order-123" await sender.send_messages(message) # Receive from specific session receiver = client.get_queue_receiver( queue_name="session-queue", session_id="order-123" ) # Receive from next available session from azure.servicebus import NEXT_AVAILABLE_SESSION receiver = client.get_queue_receiver( queue_name="session-queue", session_id=NEXT_AVAILABLE_SESSION )
Scheduled Messages
from datetime import datetime, timedelta, timezone message = ServiceBusMessage("Scheduled message") scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10) # Schedule message sequence_number = await sender.schedule_messages(message, scheduled_time) # Cancel scheduled message await sender.cancel_scheduled_messages(sequence_number)
Dead-Letter Queue
from azure.servicebus import ServiceBusSubQueue # Receive from dead-letter queue dlq_receiver = client.get_queue_receiver( queue_name="myqueue", sub_queue=ServiceBusSubQueue.DEAD_LETTER ) async with dlq_receiver: messages = await dlq_receiver.receive_messages(max_message_count=10) for msg in messages: print(f"Dead-lettered: {msg.dead_letter_reason}") await dlq_receiver.complete_message(msg)
Sync Client (for simple scripts)
from azure.servicebus import ServiceBusClient, ServiceBusMessage from azure.identity import DefaultAzureCredential with ServiceBusClient( fully_qualified_namespace="<namespace>.servicebus.windows.net", credential=DefaultAzureCredential() ) as client: with client.get_queue_sender("myqueue") as sender: sender.send_messages(ServiceBusMessage("Sync message")) with client.get_queue_receiver("myqueue") as receiver: for msg in receiver: print(str(msg)) receiver.complete_message(msg)
Best Practices
- Use async client for production workloads
- Use context managers (
) for proper cleanupasync with - Complete messages after successful processing
- Use dead-letter queue for poison messages
- Use sessions for ordered, FIFO processing
- Use message batches for high-throughput scenarios
- Set
to avoid infinite blockingmax_wait_time
Reference Files
| File | Contents |
|---|---|
| references/patterns.md | Competing consumers, sessions, retry patterns, request-response, transactions |
| references/dead-letter.md | DLQ handling, poison messages, reprocessing strategies |
| scripts/setup_servicebus.py | CLI for queue/topic/subscription management and DLQ monitoring |
When to Use
This skill is applicable to execute the workflow or actions described in the overview.
Limitations
- Use this skill only when the task clearly matches the scope described above.
- Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
- Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.