Commit 0bdcd212 authored by Prasad Vittaldev's avatar Prasad Vittaldev

Initial commit

parents
Pipeline #10177 failed with stages
.venv/
__pycache__/
*.py[cod]
.env
*.log
.DS_Store
FROM python:3.10-slim
WORKDIR /app
# Install system dependencies if needed
# RUN apt-get update && apt-get install -y ...
# Copy project configuration and source first so editable install sees the package
COPY pyproject.toml .
COPY README.md .
COPY src/ src/
# Install dependencies in editable mode for dev convenience
RUN pip install --no-cache-dir -e .
# Set python path to include src
ENV PYTHONPATH=/app/src
# Default command
CMD ["python", "-m", "src.server"]
# Communication MCP Middleware Server
This is an MCP server that acts as a middleware for the Ivrnet Message API.
## Features
- Voice, Text, and Email job management.
- Voice file management.
- List management (bounce list, opt-in list).
- Error reporting.
- Support for multiple environments (Production CA, Production US, Test).
## Usage
### Docker
```bash
docker-compose up --build
```
Once running, the MCP server is available at `http://localhost:8000/sse` (see `langflow_config.json`).
### Local Development
```bash
pip install -e .
python src/server.py
```
### Connect from Langflow
Import `langflow_config.json` into Langflow (or copy its values) so Langflow knows to talk to `http://localhost:8000/sse`.
### Testing
Install dev tooling and run pytest:
```bash
pip install -e .[dev]
pytest
```
version: '3.8'
services:
app:
build: .
volumes:
- .:/app
environment:
- PYTHONPATH=/app/src
# Keep the container running for development if needed, or run the server
# For development, you might want to run tests or a shell
ports:
- "8000:8000"
# command: tail -f /dev/null
command: uvicorn src.server:app --host 0.0.0.0 --port 8000
stdin_open: true # docker run -i
tty: true # docker run -t
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("test")
print("Attributes:", dir(mcp))
try:
app = mcp.sse_app()
print("sse_app() Result Type:", type(app))
except Exception as e:
print("Error calling sse_app():", e)
{
"name": "communication-mcp",
"url": "http://localhost:8000/sse"
}
This diff is collapsed.
[project]
name = "communication-mcp"
version = "0.1.0"
description = "MCP middleware for Ivrnet Message API"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"mcp>=0.1.0",
"httpx>=0.27.0",
"pydantic>=2.0.0",
"python-dotenv>=1.0.0",
"uvicorn>=0.20.0",
]
[project.optional-dependencies]
dev = ["pytest"]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]
import httpx
from typing import Optional, Dict, Any
from .models import Environment
ENV_URLS = {
Environment.PRODUCTION_CA: "https://message.ivrnet.com/api/v1",
Environment.PRODUCTION_US: "https://message.us.ivrnet.com/api/v1",
Environment.TEST: "https://message-staging.ivrnet.com/api/v1",
}
class IvrnetClient:
def __init__(
self,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
):
self.base_url = ENV_URLS[environment]
self.headers = {"Content-Type": "application/json"}
if api_key:
self.headers["X-API-KEY"] = api_key
elif basic_auth:
self.headers["Authorization"] = basic_auth
elif username and password:
import base64
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
self.headers["Authorization"] = f"Basic {encoded}"
else:
raise ValueError("Authentication is required: provide api_key or username/password or basic_auth")
async def request(self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.base_url}{endpoint}"
async with httpx.AsyncClient() as client:
response = await client.request(method, url, headers=self.headers, json=data, params=params)
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
# Try to parse error message from body
try:
error_body = response.json()
raise Exception(f"API Error: {e.response.status_code} - {error_body}")
except:
raise e
if response.status_code == 204:
return {}
return response.json()
async def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
return await self.request("POST", endpoint, data=data)
async def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return await self.request("GET", endpoint, params=params)
async def put(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
return await self.request("PUT", endpoint, data=data)
async def delete(self, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return await self.request("DELETE", endpoint, data=data)
from enum import Enum
from typing import List, Optional, Dict, Any, Union
from pydantic import BaseModel, Field
class Environment(str, Enum):
PRODUCTION_CA = "production-ca"
PRODUCTION_US = "production-us"
TEST = "test"
class JobStatus(str, Enum):
SCHEDULED = "scheduled"
RUNNING = "running"
COMPLETED = "completed"
EXPIRED = "expired"
PAUSED = "paused"
CANCELLED = "cancelled"
ALL = "all"
class Priority(str, Enum):
EMERGENCY = "emergency"
URGENT = "urgent"
NORMAL = "normal"
LOW = "low"
TEST = "test"
class MessageType(str, Enum):
TTS = "tts"
VOICE = "voice"
HYBRID = "hybrid"
PLAINTEXT = "plaintext"
HTML = "html"
class WebhookType(BaseModel):
scheduled: Optional[bool] = None
running: Optional[bool] = None
completed: Optional[bool] = None
expired: Optional[bool] = None
class ResponseVariables(BaseModel):
# Dynamic keys, but we can define common ones or use Dict
pass
class WebhookTypes(BaseModel):
jobstatus: Optional[WebhookType] = None
responsevariables: Optional[Dict[str, bool]] = None
class Retry(BaseModel):
attempts: Optional[int] = 3
interval: Optional[int] = 15
interval_list: Optional[List[int]] = None
statuscode_list: Optional[List[int]] = None
class WebhookAuth(BaseModel):
apikey: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
basicauth: Optional[str] = None
class Webhook(BaseModel):
url: str
auth: Optional[WebhookAuth] = None
method: Optional[str] = "POST"
retry: Optional[Retry] = None
types: Optional[WebhookTypes] = None
class JobOptions(BaseModel):
webhook: Optional[Webhook] = None
class FromResource(BaseModel):
resource: str
resourcename: Optional[str] = None
class ToResource(BaseModel):
resourceid: Optional[str] = None
resource: str
resourcename: Optional[str] = None
languagename: str
variables: Optional[Dict[str, str]] = None
metadata: Optional[Dict[str, str]] = None
class MessageVariableTags(BaseModel):
opentag: str
closetag: str
class EmailAttachment(BaseModel):
filename: str
filebase64data: str
class BodyOptions(BaseModel):
messagetype: Optional[MessageType] = None
messagevariabletags: Optional[MessageVariableTags] = None
ttsmessagevoice: Optional[str] = None # female or male
responsemessageprompt: Optional[Dict[str, str]] = None
responsesuccessmessage: Optional[Dict[str, str]] = None
responseerrormessage: Optional[Dict[str, str]] = None
responsevariables: Optional[Dict[str, str]] = None
responsevariablesactions: Optional[Dict[str, str]] = None
responsevariablesmessages: Optional[Dict[str, Dict[str, str]]] = None
hangupmessage: Optional[Dict[str, str]] = None
transfermessage: Optional[Dict[str, str]] = None
jumpmessage: Optional[Dict[str, str]] = None
languagetranslate: Optional[bool] = False
emailattachments: Optional[List[EmailAttachment]] = None
class Body(BaseModel):
message: Dict[str, str]
voicemailmessage: Optional[Dict[str, str]] = None
subject: Optional[Dict[str, str]] = None # For email
options: Optional[BodyOptions] = None
class Schedule(BaseModel):
priority: Optional[Priority] = Priority.NORMAL
start: str # ISO8601
end: str # ISO8601
retryattempts: Optional[int] = None
retryinterval: Optional[int] = None
class CreateJobRequest(BaseModel):
job: Optional[Dict[str, Any]] = None # name, metadata, options
from_: FromResource = Field(..., alias="from")
to: List[ToResource]
body: Body
schedule: Schedule
class JobResponse(BaseModel):
jobid: Optional[str] = None
payloadResponse: Optional[Dict[str, Any]] = None
payloadValidation: Optional[Dict[str, Any]] = None
class VoiceFileResponse(BaseModel):
tollfree: Optional[str] = None
accesscode: Optional[str] = None
fileid: Optional[str] = None
filename: Optional[str] = None
from mcp.server.fastmcp import FastMCP
from .tools.jobs import (
create_voice_job,
create_text_job,
create_email_job,
get_job,
list_jobs,
cancel_job,
pause_resume_job,
send_followup_message,
)
from .tools.voice_files import (
create_voice_file_placeholder,
list_voice_files,
delete_voice_file,
)
from .tools.lists import (
clear_email_bounce_list,
upload_text_optin_list,
)
from .tools.errors import (
get_job_errors,
get_webhook_errors,
)
mcp = FastMCP("communication-mcp")
# Register tools
mcp.tool()(create_voice_job)
mcp.tool()(create_text_job)
mcp.tool()(create_email_job)
mcp.tool()(get_job)
mcp.tool()(list_jobs)
mcp.tool()(cancel_job)
mcp.tool()(pause_resume_job)
mcp.tool()(send_followup_message)
mcp.tool()(create_voice_file_placeholder)
mcp.tool()(list_voice_files)
mcp.tool()(delete_voice_file)
mcp.tool()(clear_email_bounce_list)
mcp.tool()(upload_text_optin_list)
mcp.tool()(get_job_errors)
mcp.tool()(get_webhook_errors)
# Expose the ASGI app for uvicorn
app = mcp.sse_app()
def main():
mcp.run()
if __name__ == "__main__":
main()
from typing import Optional, List, Dict, Any
from ..api import IvrnetClient
from ..models import Environment
def get_client(
environment: Environment,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> IvrnetClient:
return IvrnetClient(environment, api_key, username, password, basic_auth)
async def get_job_errors(
job_type: str, # voice, text, email
days: int = 30,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Get job errors."""
client = get_client(environment, api_key, username, password, basic_auth)
params = {"days": days}
return await client.get(f"/{job_type}/errors/", params=params)
async def get_webhook_errors(
job_type: str, # voice, text, email
days: int = 30,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Get webhook errors."""
client = get_client(environment, api_key, username, password, basic_auth)
params = {"days": days}
return await client.get(f"/{job_type}/webhook/errors/", params=params)
from typing import Optional, List, Dict, Any
from ..api import IvrnetClient
from ..models import Environment, CreateJobRequest, JobStatus
def get_client(
environment: Environment,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> IvrnetClient:
return IvrnetClient(environment, api_key, username, password, basic_auth)
async def create_voice_job(
from_resource: str,
to_resource: str,
languagename: str,
message: Dict[str, str],
start_time: str,
end_time: str,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
job_name: Optional[str] = None,
job_metadata: Optional[Dict[str, str]] = None,
webhook_url: Optional[str] = None,
from_resourcename: Optional[str] = None,
to_resourceid: Optional[str] = None,
to_resourcename: Optional[str] = None,
to_variables: Optional[Dict[str, str]] = None,
to_metadata: Optional[Dict[str, str]] = None,
voicemail_message: Optional[Dict[str, str]] = None,
priority: Optional[str] = "normal",
retry_attempts: Optional[int] = 1,
retry_interval: Optional[int] = 15,
) -> Dict[str, Any]:
"""Create a voice job."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {
"job": {
"name": job_name,
"metadata": job_metadata,
},
"from": {
"resource": from_resource,
"resourcename": from_resourcename,
},
"to": [
{
"resourceid": to_resourceid,
"resource": to_resource,
"resourcename": to_resourcename,
"languagename": languagename,
"variables": to_variables,
"metadata": to_metadata,
}
],
"body": {
"message": message,
"voicemailmessage": voicemail_message,
},
"schedule": {
"start": start_time,
"end": end_time,
"priority": priority,
"retryattempts": retry_attempts,
"retryinterval": retry_interval,
}
}
if webhook_url:
payload["job"]["options"] = {"webhook": {"url": webhook_url}}
return await client.post("/voice/", payload)
async def create_text_job(
from_resource: str,
to_resource: str,
languagename: str,
message: Dict[str, str],
start_time: str,
end_time: str,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
job_name: Optional[str] = None,
job_metadata: Optional[Dict[str, str]] = None,
webhook_url: Optional[str] = None,
to_resourceid: Optional[str] = None,
to_resourcename: Optional[str] = None,
to_variables: Optional[Dict[str, str]] = None,
to_metadata: Optional[Dict[str, str]] = None,
priority: Optional[str] = "normal",
retry_attempts: Optional[int] = 1,
retry_interval: Optional[int] = 15,
) -> Dict[str, Any]:
"""Create a text job."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {
"job": {
"name": job_name,
"metadata": job_metadata,
},
"from": {
"resource": from_resource,
},
"to": [
{
"resourceid": to_resourceid,
"resource": to_resource,
"resourcename": to_resourcename,
"languagename": languagename,
"variables": to_variables,
"metadata": to_metadata,
}
],
"body": {
"message": message,
},
"schedule": {
"start": start_time,
"end": end_time,
"priority": priority,
"retryattempts": retry_attempts,
"retryinterval": retry_interval,
}
}
if webhook_url:
payload["job"]["options"] = {"webhook": {"url": webhook_url}}
return await client.post("/text/", payload)
async def create_email_job(
from_resource: str,
to_resource: str,
languagename: str,
message: Dict[str, str],
subject: Dict[str, str],
start_time: str,
end_time: str,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
job_name: Optional[str] = None,
job_metadata: Optional[Dict[str, str]] = None,
webhook_url: Optional[str] = None,
from_resourcename: Optional[str] = None,
to_resourceid: Optional[str] = None,
to_resourcename: Optional[str] = None,
to_variables: Optional[Dict[str, str]] = None,
to_metadata: Optional[Dict[str, str]] = None,
messagetype: Optional[str] = "plaintext",
priority: Optional[str] = "normal",
retry_attempts: Optional[int] = 1,
retry_interval: Optional[int] = 15,
) -> Dict[str, Any]:
"""Create an email job."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {
"job": {
"name": job_name,
"metadata": job_metadata,
},
"from": {
"resource": from_resource,
"resourcename": from_resourcename,
},
"to": [
{
"resourceid": to_resourceid,
"resource": to_resource,
"resourcename": to_resourcename,
"languagename": languagename,
"variables": to_variables,
"metadata": to_metadata,
}
],
"body": {
"message": message,
"subject": subject,
"options": {
"messagetype": messagetype
}
},
"schedule": {
"start": start_time,
"end": end_time,
"priority": priority,
"retryattempts": retry_attempts,
"retryinterval": retry_interval,
}
}
if webhook_url:
payload["job"].setdefault("options", {})["webhook"] = {"url": webhook_url}
return await client.post("/email/", payload)
async def get_job(
job_id: str,
job_type: str, # voice, text, email
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Get job details."""
client = get_client(environment, api_key, username, password, basic_auth)
return await client.get(f"/{job_type}/{job_id}")
async def list_jobs(
job_type: str, # voice, text, email
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
page: int = 1,
page_size: int = 10,
status: JobStatus = JobStatus.ALL,
) -> List[Dict[str, Any]]:
"""List jobs."""
client = get_client(environment, api_key, username, password, basic_auth)
params = {
"page": page,
"pagesize": page_size,
"status": status.value,
}
return await client.get(f"/{job_type}/list/", params=params)
async def cancel_job(
job_id: str,
job_type: str, # voice, text, email
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Cancel a job."""
client = get_client(environment, api_key, username, password, basic_auth)
return await client.delete(f"/{job_type}/{job_id}")
async def pause_resume_job(
job_id: str,
job_type: str, # voice, text, email
action: str, # pause or resume
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Pause or resume a job."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {
"jobid": job_id,
"action": action
}
return await client.put(f"/{job_type}/", payload)
async def send_followup_message(
job_id: str,
resource: str,
message: str,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Send a follow-up text message."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {
"jobid": job_id,
"resource": resource,
"message": message
}
return await client.post("/text/msg/recipient/", payload)
from typing import Optional, List, Dict, Any
from ..api import IvrnetClient
from ..models import Environment
def get_client(
environment: Environment,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> IvrnetClient:
return IvrnetClient(environment, api_key, username, password, basic_auth)
async def clear_email_bounce_list(
resources: Optional[List[str]] = None,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Clear email bounce list."""
client = get_client(environment, api_key, username, password, basic_auth)
if resources:
payload = {"to": [{"resource": r} for r in resources]}
return await client.delete("/email/bouncelist/clearlist/", data=payload)
else:
return await client.delete("/email/bouncelist/cleardomain/")
async def upload_text_optin_list(
resources: List[str],
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Upload text opt-in list."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {"to": [{"resource": r} for r in resources]}
return await client.post("/text/optin/", payload)
from typing import Optional, List, Dict, Any
from ..api import IvrnetClient
from ..models import Environment
def get_client(
environment: Environment,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> IvrnetClient:
return IvrnetClient(environment, api_key, username, password, basic_auth)
async def create_voice_file_placeholder(
filename: str,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a voice file placeholder."""
client = get_client(environment, api_key, username, password, basic_auth)
payload = {"filename": filename}
return await client.post("/voice/audio/accesscode/", payload)
async def list_voice_files(
status: str = "completed", # completed or incomplete
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""List voice files."""
client = get_client(environment, api_key, username, password, basic_auth)
if status == "incomplete":
return await client.get("/voice/audio/list/accesscode/")
else:
return await client.get("/voice/audio/list/")
async def delete_voice_file(
file_id: str,
environment: Environment = Environment.PRODUCTION_CA,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
basic_auth: Optional[str] = None,
) -> Dict[str, Any]:
"""Delete a voice file."""
client = get_client(environment, api_key, username, password, basic_auth)
return await client.delete(f"/voice/audio/file/{file_id}")
import pytest
from src.tools import jobs
from src.api import IvrnetClient
from src.models import Environment
def test_environment_enum():
assert Environment.PRODUCTION_CA == "production-ca"
def test_ivrnet_client_requires_auth():
with pytest.raises(ValueError):
IvrnetClient()
@pytest.mark.asyncio
async def test_create_email_job_webhook(monkeypatch):
captured = {}
class DummyClient:
async def post(self, endpoint, payload):
captured["endpoint"] = endpoint
captured["payload"] = payload
return {"ok": True}
monkeypatch.setattr(jobs, "get_client", lambda *a, **k: DummyClient())
await jobs.create_email_job(
from_resource="from",
to_resource="to",
languagename="en",
message={"en": "Hello"},
subject={"en": "Subject"},
start_time="2020-01-01T00:00:00Z",
end_time="2020-01-01T01:00:00Z",
webhook_url="https://example.com/hook",
)
assert captured["endpoint"] == "/email/"
assert captured["payload"]["job"]["options"]["webhook"]["url"] == "https://example.com/hook"
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment