Use Cases
Celery Tasks
from celery import shared_task
from astami import AMIClient, AMIError
@shared_task
def reload_dialplan(server_host: str) -> bool:
try:
with AMIClient(server_host, 5038, "admin", "secret") as ami:
response = ami.command("dialplan reload")
return response.success
except AMIError as e:
logger.error(f"AMI error: {e}")
return False
Django Management Command
from django.core.management.base import BaseCommand
from astami import AMIClient
class Command(BaseCommand):
help = "Show Asterisk version"
def handle(self, *args, **options):
with AMIClient("localhost", 5038, "admin", "secret") as ami:
response = ami.command("core show version")
self.stdout.write(self.style.SUCCESS(response.output[0]))
FastAPI Application
from fastapi import FastAPI
from astami import AsyncAMIClient
app = FastAPI()
@app.get("/asterisk/version")
async def get_version():
async with AsyncAMIClient("localhost", 5038, "admin", "secret") as ami:
response = await ami.command("core show version")
return {"version": response.output[0] if response.output else "Unknown"}
Flask Application
from flask import Flask, jsonify
from astami import AMIClient
app = Flask(__name__)
@app.route("/asterisk/version")
def get_version():
with AMIClient("localhost", 5038, "admin", "secret") as ami:
response = ami.command("core show version")
return jsonify({"version": response.output[0] if response.output else "Unknown"})
Monitoring Script
#!/usr/bin/env python3
from astami import AMIClient, AMIError
import sys
def check_asterisk(host: str) -> int:
try:
with AMIClient(host, 5038, "admin", "secret") as ami:
response = ami.ping()
if response.success:
print(f"OK - Asterisk at {host} is responding")
return 0
else:
print(f"WARNING - Asterisk at {host} returned: {response.message}")
return 1
except AMIError as e:
print(f"CRITICAL - Cannot connect to Asterisk at {host}: {e}")
return 2
if __name__ == "__main__":
host = sys.argv[1] if len(sys.argv) > 1 else "localhost"
sys.exit(check_asterisk(host))
Batch Operations
from astami import AMIClient
servers = ["pbx1.example.com", "pbx2.example.com", "pbx3.example.com"]
for server in servers:
with AMIClient(server, 5038, "admin", "secret") as ami:
ami.reload("pjsip")
print(f"Reloaded PJSIP on {server}")