Marketplace ai-handler
Integrate Replicate AI models with background processing, S3 storage, and credit systems
install
source · Clone the upstream repo
git clone https://github.com/aiskillstore/marketplace
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/aiskillstore/marketplace "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/aayushbaniya2006/ai-handler" ~/.claude/skills/aiskillstore-marketplace-ai-handler && rm -rf "$T"
manifest:
skills/aayushbaniya2006/ai-handler/SKILL.mdsource content
Replicate AI Handler Skill
This skill provides a production-ready pattern for integrating Replicate AI models. It handles long-running predictions using Inngest background jobs, stores results in S3, manages user credits, and updates database state.
Architecture
- Trigger: User requests a generation via API (e.g.,
)./api/app/ai-images - Validation: Check/deduct user credits.
- State: Create a database record with
.status: "processing" - Queue: Trigger an Inngest function to handle the Replicate API call.
- Processing:
- Call Replicate API.
- Wait for completion (polling or webhook).
- Download result and upload to S3 (server-side).
- Completion: Update database record with S3 URL and
.status: "completed" - Failure: Refund credits if failed (optional) and update status to
.failed
Prerequisites
package installed (replicate
).npm install replicate
inREPLICATE_API_TOKEN
..env- S3 and Inngest configured.
Implementation Steps
1. API Route (Trigger)
src/app/api/app/generate/route.ts
import withAuthRequired from "@/lib/auth/withAuthRequired"; import { db } from "@/db"; import { generations } from "@/db/schema"; import { inngest } from "@/lib/inngest/client"; import { checkCredits, deductCredits } from "@/lib/credits"; // Hypothetical helpers export const POST = withAuthRequired(async (req, { session }) => { const body = await req.json(); // 1. Check Credits const hasCredits = await checkCredits(session.user.id, "image_generation", 1); if (!hasCredits) return new Response("Insufficient credits", { status: 403 }); // 2. Create DB Record (Pending) const [record] = await db.insert(generations).values({ userId: session.user.id, prompt: body.prompt, status: "processing", }).returning(); // 3. Deduct Credits (Optimistic) await deductCredits(session.user.id, "image_generation", 1, { source: "api", refId: record.id }); // 4. Trigger Background Job await inngest.send({ name: "app/ai.generate", data: { generationId: record.id, prompt: body.prompt, userId: session.user.id } }); return Response.json({ id: record.id, status: "processing" }); });
2. Inngest Function (Processor)
src/lib/inngest/functions/app/ai/generate.ts
import { inngest } from "@/lib/inngest/client"; import Replicate from "replicate"; import uploadFromServer from "@/lib/s3/uploadFromServer"; import { db } from "@/db"; import { generations } from "@/db/schema"; import { eq } from "drizzle-orm"; const replicate = new Replicate({ auth: process.env.REPLICATE_API_TOKEN }); export const generateAI = inngest.createFunction( { id: "ai-generation-worker", concurrency: 5 }, { event: "app/ai.generate" }, async ({ event, step }) => { const { generationId, prompt } = event.data; try { // 1. Call Replicate (Step ensures retries on network error) const prediction = await step.run("call-replicate", async () => { return await replicate.predictions.create({ version: "model-version-hash", input: { prompt } }); }); // 2. Wait for completion // Replicate usually takes time. We can use waitForEvent if using webhooks, // or simple polling loop with sleep if webhooks aren't set up. // For simplicity, here is a polling pattern using sleep: let finalPrediction = prediction; while (finalPrediction.status !== "succeeded" && finalPrediction.status !== "failed") { await step.sleep("wait-for-gpu", "5s"); finalPrediction = await step.run("check-status", () => replicate.predictions.get(prediction.id) ); } if (finalPrediction.status === "failed") { throw new Error(finalPrediction.error); } // 3. Upload to S3 // Replicate returns a temporary URL. We must persist it. const outputUrl = finalPrediction.output[0]; // Adjust based on model output const s3Url = await step.run("upload-to-s3", async () => { // Fetch image buffer const response = await fetch(outputUrl); const arrayBuffer = await response.arrayBuffer(); const base64 = Buffer.from(arrayBuffer).toString("base64"); // Use existing S3 skill return await uploadFromServer({ file: base64, path: `generations/${generationId}.png`, contentType: "image/png" }); }); // 4. Update DB await step.run("update-db", async () => { await db.update(generations) .set({ status: "completed", url: s3Url }) .where(eq(generations.id, generationId)); }); } catch (error) { // Handle Failure await step.run("mark-failed", async () => { await db.update(generations) .set({ status: "failed" }) .where(eq(generations.id, generationId)); // Optional: Refund credits here }); throw error; // Re-throw to show failure in Inngest dashboard } } );