import requests
import time
API_KEY = "mk_your_api_key"
BASE_URL = "https://api.mosaic.so"
headers = {"Authorization": f"Bearer {API_KEY}"}
# Step 1: Get upload URL and policy fields
response = requests.post(f"{BASE_URL}/videos/get_upload_url",
headers={**headers, "Content-Type": "application/json"},
json={"filename": "my-video.mp4", "content_type": "video/mp4"})
upload_data = response.json()
# Step 2: Upload video with policy fields (multipart form)
with open("my-video.mp4", "rb") as video_file:
# Build multipart form with policy fields first, then file
files = {'file': ('my-video.mp4', video_file, 'video/mp4')}
response = requests.post(upload_data["upload_url"],
data=upload_data['fields'], # Policy fields
files=files)
if response.status_code != 204:
raise Exception(f"Upload failed with status {response.status_code}")
# Step 3: Finalize upload
requests.post(f"{BASE_URL}/videos/finalize_upload",
headers={**headers, "Content-Type": "application/json"},
json={"video_id": upload_data["video_id"]})
# Step 4: Run agent
agent_id = "{agent_id}"
run_response = requests.post(
f"{BASE_URL}/agent/{agent_id}/run",
headers={**headers, "Content-Type": "application/json"},
json={"video_ids": [upload_data["video_id"]]}
)
run_id = run_response.json()["run_id"]
# Step 5: Poll for status
while True:
status_response = requests.get(f"{BASE_URL}/agent_run/{run_id}", headers=headers)
status_data = status_response.json()
if status_data["status"] in ["completed", "failed"]:
print(f"Agent run {status_data['status']}")
if status_data["status"] == "completed":
for output in status_data["outputs"]:
print(f"Output video: {output['video_url']}")
break
time.sleep(5) # Poll every 5 seconds