Production RAG for WordPress: pgvector + FastAPI backend, secure webhook intake, and a shortcode chat UI

Overview
This tutorial wires WordPress to a production-grade RAG backend:
– Intake: WordPress Media upload triggers a signed webhook to the backend.
– Index: Backend fetches the file, chunks text, stores embeddings in Postgres/pgvector.
– Serve: FastAPI endpoint answers user questions via retrieval-augmented generation.
– Frontend: A WordPress shortcode renders a chat box that queries the backend.

We’ll keep the stack minimal and production-ready:
– WordPress (webhook + shortcode)
– Python FastAPI backend
– Postgres + pgvector
– OpenAI embeddings + model (swap as needed)
– Nginx or cloud proxy, HTTPS, and API key auth

Architecture
1) User uploads PDF/Doc to WordPress Media.
2) WordPress sends a webhook: {file_url, title, post_id, signature}.
3) Backend validates the HMAC, downloads file, extracts text, chunks, embeds, stores in pgvector with a collection/site scope.
4) Chat UI (shortcode) hits /rag/query with apiKey to return grounded answers.

Prereqs
– WordPress admin access
– Python 3.11+, FastAPI, uvicorn
– Postgres 14+ with pgvector
– OpenAI API key (or compatible embedding/LLM)
– A secret shared between WP and backend for webhook signing

Database setup (pgvector)
— Enable extension
CREATE EXTENSION IF NOT EXISTS vector;

— Documents table
CREATE TABLE IF NOT EXISTS documents (
id UUID PRIMARY KEY,
site_id TEXT NOT NULL,
doc_id TEXT NOT NULL, — WP attachment ID or slug
title TEXT,
source_url TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);

— Chunks table
CREATE TABLE IF NOT EXISTS doc_chunks (
id UUID PRIMARY KEY,
doc_id UUID REFERENCES documents(id) ON DELETE CASCADE,
idx INT NOT NULL,
content TEXT NOT NULL,
embedding vector(1536), — match embedding size
token_count INT,
created_at TIMESTAMPTZ DEFAULT now()
);

— Index for ANN search
CREATE INDEX IF NOT EXISTS doc_chunks_embedding_ivfflat
ON doc_chunks USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

— Filter speedups
CREATE INDEX IF NOT EXISTS doc_chunks_doc_id_idx ON doc_chunks(doc_id);
CREATE INDEX IF NOT EXISTS documents_site_doc_idx ON documents(site_id, doc_id);

FastAPI backend (app/main.py)
– Provides /webhook/wp-media to index uploads.
– Provides /rag/query for Q&A.
– Uses HMAC-SHA256 signature (X-WP-Signature) header.

from fastapi import FastAPI, Header, HTTPException, Depends
from pydantic import BaseModel
import hmac, hashlib, os, uuid, httpx, io
import asyncpg
from typing import List, Optional
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
from openai import AsyncOpenAI

OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
WEBHOOK_SECRET = os.getenv(“WEBHOOK_SECRET”) # shared with WP
DATABASE_URL = os.getenv(“DATABASE_URL”) # postgres://…
EMBED_MODEL = “text-embedding-3-small”
GEN_MODEL = “gpt-4o-mini”

app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=[“https://your-site.com”], allow_methods=[“*”], allow_headers=[“*”])

client = AsyncOpenAI(api_key=OPENAI_API_KEY)

async def db():
if not hasattr(app.state, “pool”):
app.state.pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=8)
return app.state.pool

def verify_signature(raw_body: bytes, signature: str):
mac = hmac.new(WEBHOOK_SECRET.encode(), raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(mac, signature)

class WPWebhook(BaseModel):
site_id: str
file_url: str
title: Optional[str] = None
attachment_id: str

@app.post(“/webhook/wp-media”)
async def wp_media(webhook: WPWebhook, x_wp_signature: str = Header(None), raw_body: bytes = b””, pool=Depends(db)):
# Signature check (requires a middleware or route body retrieval)
if not x_wp_signature or not verify_signature(raw_body, x_wp_signature):
raise HTTPException(status_code=401, detail=”Invalid signature”)

# Download file
async with httpx.AsyncClient(timeout=60) as http:
r = await http.get(webhook.file_url)
r.raise_for_status()
content = r.content

# Extract text (PDF/doc). Minimal example uses pdfminer.six if PDF; else fallback.
text = await extract_text_auto(webhook.file_url, content)
chunks = simple_chunk(text, max_chars=1200, overlap=100)

# Insert document
doc_uuid = str(uuid.uuid4())
async with pool.acquire() as conn:
await conn.execute(
“INSERT INTO documents(id, site_id, doc_id, title, source_url) VALUES($1,$2,$3,$4,$5)”,
doc_uuid, webhook.site_id, webhook.attachment_id, webhook.title, webhook.file_url
)

# Embed and insert chunks
embeddings = await embed_texts([c[“content”] for c in chunks])
async with pool.acquire() as conn:
async with conn.transaction():
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
await conn.execute(
“INSERT INTO doc_chunks(id, doc_id, idx, content, embedding, token_count) VALUES($1,$2,$3,$4,$5,$6)”,
str(uuid.uuid4()), doc_uuid, i, chunk[“content”], emb, chunk[“tokens”]
)
return {“status”:”ok”,”doc_id”:doc_uuid,”chunks”:len(chunks)}

async def extract_text_auto(url: str, content: bytes) -> str:
import mimetypes, tempfile, os
mt = mimetypes.guess_type(url)[0] or “”
if “pdf” in mt or url.lower().endswith(“.pdf”):
from pdfminer.high_level import extract_text
with tempfile.NamedTemporaryFile(delete=False, suffix=”.pdf”) as f:
f.write(content); f.flush()
out = extract_text(f.name)
os.unlink(f.name)
return out or “”
# Basic fallback
try:
return content.decode(“utf-8″, errors=”ignore”)
except:
return “”

def simple_chunk(text: str, max_chars=1200, overlap=100):
text = text.strip()
if not text:
return []
chunks = []
i = 0
while i < len(text):
end = min(i+max_chars, len(text))
chunks.append({"content": text[i:end], "tokens": int((end – i)/4)}) # rough est
i = end – overlap
if i < 0: i = 0
return chunks

async def embed_texts(texts: List[str]):
if not texts:
return []
resp = await client.embeddings.create(model=EMBED_MODEL, input=texts)
return [e.embedding for e in resp.data]

class QueryBody(BaseModel):
site_id: str
question: str
k: int = 5
api_key: Optional[str] = None # simple per-site key

def require_site_key(key: Optional[str], site_id: str):
expected = os.getenv(f"SITE_{site_id.upper()}_KEY")
if expected and key != expected:
raise HTTPException(status_code=401, detail="Invalid API key")

@app.post("/rag/query")
async def rag_query(q: QueryBody, pool=Depends(db)):
require_site_key(q.api_key, q.site_id)
# Embed question
qemb = (await embed_texts([q.question]))[0]
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT c.content, 1 – (c.embedding $1::vector) AS score
FROM doc_chunks c
JOIN documents d ON d.id = c.doc_id
WHERE d.site_id = $2
ORDER BY c.embedding $1::vector
LIMIT $3
“””,
qemb, q.site_id, q.k
)
context = “nn”.join([r[“content”] for r in rows])

prompt = f”You are a helpful assistant. Use the context to answer.nnContext:n{context}nnQuestion: {q.question}nAnswer concisely with citations like [chunk #].”
messages = [{“role”:”user”,”content”:prompt}]
comp = await client.chat.completions.create(model=GEN_MODEL, messages=messages, temperature=0.2)
answer = comp.choices[0].message.content
return {“answer”: answer, “hits”: len(rows)}

Note: For raw_body signature verification, FastAPI needs request.state or a custom middleware to capture the raw bytes. In production, add a middleware to cache body for verification.

WordPress: webhook sender (plugin)
Create a small MU-plugin or standard plugin to post to the backend on upload.

post_type !== ‘attachment’) return;

$file_url = wp_get_attachment_url($post_ID);
$title = get_the_title($post_ID);
$site_id = get_bloginfo(‘url’); // or a fixed slug
$payload = array(
‘site_id’ => $site_id,
‘file_url’ => $file_url,
‘title’ => $title,
‘attachment_id’ => strval($post_ID),
);
$json = wp_json_encode($payload);
$secret = getenv(‘AI_WEBHOOK_SECRET’) ?: ‘change-me’;
$sig = hash_hmac(‘sha256’, $json, $secret);

$resp = wp_remote_post(‘https://api.your-backend.com/webhook/wp-media’, array(
‘headers’ => array(
‘Content-Type’ => ‘application/json’,
‘X-WP-Signature’ => $sig
),
‘body’ => $json,
‘timeout’ => 30
));
});

Shortcode chat UI
Adds [ai_chat] shortcode and a minimal UI that posts to /rag/query.

function ai_chat_shortcode($atts){
$a = shortcode_atts(array(
‘placeholder’ => ‘Ask about our docs…’,
‘site_id’ => get_bloginfo(‘url’),
), $atts);
ob_start(); ?>

<input id="ai-chat-q" type="text" placeholder="” style=”width:100%;padding:8px;” />

(function(){
const api = ‘https://api.your-backend.com/rag/query’;
const siteId = ”;
const key = ”;
const log = document.getElementById(‘ai-chat-log’);
const q = document.getElementById(‘ai-chat-q’);
document.getElementById(‘ai-chat-send’).addEventListener(‘click’, async function(){
const question = q.value.trim();
if(!question) return;
log.innerHTML += ‘

You: ‘ + question + ‘

‘;
q.value = ”;
try {
const r = await fetch(api, {
method: ‘POST’,
headers: {‘Content-Type’:’application/json’},
body: JSON.stringify({site_id: siteId, question, api_key: key})
});
const data = await r.json();
log.innerHTML += ‘

AI: ‘ + (data.answer || ‘No answer’) + ‘

‘;
} catch(e){
log.innerHTML += ‘

Error contacting AI backend.

‘;
}
});
})();

Writing via a small admin page, or define in wp-config.php and expose via get_option fallback.

Security and performance
– Transport: Enforce HTTPS end-to-end. Set CORS to your WP origin only.
– Auth: Use HMAC for webhooks and per-site API keys for /rag/query. Rotate keys regularly.
– Limits: Cap file size on WP, and validate mimetypes server-side. Queue large files.
– Costs: Use a small embedding model for indexing; cache embeddings by hash.
– Indexing: Run embedding in a background worker if uploads are frequent. Return 202 and poll status.
– Vector search: Tune ivfflat lists and analyze to your data size. Consider HNSW (pgvector 0.7+).
– Token control: Limit k and compress context (dedupe, summarization).
– Observability: Log latency, chunk counts, and hit scores. Add simple eval prompts for regression checks.
– Deployment:
– Postgres: managed instance with pgvector.
– Backend: Fly.io/Render/VM with health checks, 2+ replicas, stickyless.
– Secrets: Use platform secrets, not hard-coded keys.
– CDN: Serve static JS/CSS via WP enqueue, cache API via short TTL if answers are stable.

Local testing
– Create .env with OPENAI_API_KEY, WEBHOOK_SECRET, DATABASE_URL, SITE_{SITEID}_KEY.
– Run: uvicorn app.main:app –host 0.0.0.0 –port 8080 –proxy-headers
– Post a test webhook with curl and validate doc/chunk counts.
– Use the [ai_chat] shortcode on a test page.

What to adjust
– Swap extractors (unstructured, textract) for DOCX/HTML.
– Replace OpenAI with local or Azure endpoints by changing embed/generation calls.
– Add per-document metadata filters (post type, tags) in the query.

Build a Secure AI API Gateway with Django for WordPress (JWT, Rate Limits, Cost Tracking)

Overview
This tutorial walks through implementing a secure AI API gateway in Django that your WordPress sites can call. It handles JWT auth, per-site rate limits with Redis, provider failover, streaming, and cost tracking. You’ll get a minimal WordPress client to invoke the gateway without exposing raw provider keys.

Architecture
– WordPress site(s) -> your Django Gateway (JWT) -> LLM provider(s)
– Redis for rate limiting and idempotency
– PostgreSQL for tenants, usage, and audit logs
– Optional signed webhooks from Gateway back to WordPress for async tasks

Prerequisites
– Python 3.11+, Django 5.x, djangorestframework
– Redis, Postgres
– Provider keys (e.g., OpenAI/Azure OpenAI/Anthropic)
– A domain with HTTPS (e.g., api.example.com)
– WordPress with REST API enabled

Django project setup
– django-admin startproject ai_gateway
– pip install djangorestframework PyJWT redis httpx pydantic python-dotenv
– Add rest_framework to INSTALLED_APPS

Models (ai/models.py)
from django.db import models

class Tenant(models.Model):
name = models.CharField(max_length=120)
slug = models.SlugField(unique=True)
jwt_secret = models.CharField(max_length=128) # per-tenant JWT secret
rate_limit_rpm = models.IntegerField(default=60)
rate_limit_rpd = models.IntegerField(default=5000)
active = models.BooleanField(default=True)

class UsageEvent(models.Model):
tenant = models.ForeignKey(Tenant, on_delete=models.CASCADE)
provider = models.CharField(max_length=32)
model = models.CharField(max_length=64)
input_tokens = models.IntegerField(default=0)
output_tokens = models.IntegerField(default=0)
cost_usd = models.DecimalField(max_digits=10, decimal_places=6, default=0)
request_id = models.CharField(max_length=64, db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=16, default=’ok’) # ok|error|timeout

Settings (ai_gateway/settings.py)
– Configure DATABASES and CACHES (Redis)
– Add env flags
OPENAI_API_KEY=…
ANTHROPIC_API_KEY=…
DEFAULT_PROVIDER=openai
STREAM_DEFAULT=true

JWT auth (ai/auth.py)
import time, jwt
from django.http import JsonResponse
from .models import Tenant

def authenticate(request):
auth = request.headers.get(‘Authorization’, ”)
if not auth.startswith(‘Bearer ‘):
return None, JsonResponse({‘error’:’missing_bearer’}, status=401)
token = auth.split(‘ ‘)[1]
try:
# Decode without secret to get tenant slug
unverified = jwt.decode(token, options={“verify_signature”: False}, algorithms=[‘HS256’])
slug = unverified.get(‘sub’)
t = Tenant.objects.get(slug=slug, active=True)
payload = jwt.decode(token, t.jwt_secret, algorithms=[‘HS256’])
if payload.get(‘exp’, 0) tenant.rate_limit_rpm: return False, ‘rate_limited_minute’
if d_count > tenant.rate_limit_rpd: return False, ‘rate_limited_day’
return True, None

Provider proxy (ai/providers.py)
import httpx, os

class ProviderError(Exception): pass

async def call_openai(payload, stream=False):
headers = {“Authorization”: f”Bearer {os.environ[‘OPENAI_API_KEY’]}”}
url = “https://api.openai.com/v1/chat/completions”
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, headers=headers, json=payload)
if resp.status_code >= 400:
raise ProviderError(resp.text)
return resp

async def call_anthropic(payload, stream=False):
headers = {“x-api-key”: os.environ[‘ANTHROPIC_API_KEY’], “anthropic-version”:”2023-06-01″}
url = “https://api.anthropic.com/v1/messages”
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, headers=headers, json=payload)
if resp.status_code >= 400:
raise ProviderError(resp.text)
return resp

async def provider_call(provider, payload, stream=False):
if provider == ‘openai’: return await call_openai(payload, stream)
if provider == ‘anthropic’: return await call_anthropic(payload, stream)
raise ProviderError(‘unsupported_provider’)

Cost estimation (ai/costs.py)
# Simple example; update with current pricing
PRICES = {
(‘openai’,’gpt-4o-mini’): (0.00015, 0.0006), # input, output per 1k tokens
(‘anthropic’,’claude-3-haiku’): (0.00025, 0.00125),
}

def estimate_cost(provider, model, in_toks, out_toks):
key = (provider, model)
if key not in PRICES: return 0.0
inp, outp = PRICES[key]
return round((in_toks/1000.0)*inp + (out_toks/1000.0)*outp, 6)

Views (ai/views.py)
import json, uuid, asyncio
from django.views import View
from django.http import JsonResponse, StreamingHttpResponse
from .auth import authenticate
from .limits import check_limits
from .models import UsageEvent
from .providers import provider_call, ProviderError
from .costs import estimate_cost

class ChatView(View):
async def post(self, request):
tenant, err = authenticate(request)
if err: return err
ok, reason = check_limits(tenant, ‘chat’)
if not ok: return JsonResponse({‘error’:reason}, status=429)

body = json.loads(request.body.decode(‘utf-8’))
provider = body.get(‘provider’, ‘openai’)
model = body.get(‘model’, ‘gpt-4o-mini’)
messages = body.get(‘messages’, [])
stream = bool(body.get(‘stream’, False))
request_id = body.get(‘request_id’) or str(uuid.uuid4())

payload = {‘model’: model, ‘messages’: messages, ‘stream’: stream}

try:
resp = await provider_call(provider, payload, stream=stream)
data = resp.json()
# Token counts: adapt to provider response
in_toks = data.get(‘usage’, {}).get(‘prompt_tokens’, 0)
out_toks = data.get(‘usage’, {}).get(‘completion_tokens’, 0)
cost = estimate_cost(provider, model, in_toks, out_toks)
UsageEvent.objects.create(
tenant=tenant, provider=provider, model=model,
input_tokens=in_toks, output_tokens=out_toks,
cost_usd=cost, request_id=request_id, status=’ok’
)
return JsonResponse({‘id’: request_id, ‘provider’: provider, ‘model’: model, ‘data’: data})
except ProviderError as e:
UsageEvent.objects.create(
tenant=tenant, provider=provider, model=model,
input_tokens=0, output_tokens=0, cost_usd=0, request_id=request_id, status=’error’
)
return JsonResponse({‘error’:’provider_error’,’detail’:str(e)}, status=502)
except Exception as e:
return JsonResponse({‘error’:’gateway_error’,’detail’:str(e)}, status=500)

URLs (ai/urls.py)
from django.urls import path
from .views import ChatView
urlpatterns = [ path(‘v1/chat’, ChatView.as_view(), name=’chat’) ]

Project urls (ai_gateway/urls.py)
from django.urls import path, include
urlpatterns = [ path(‘api/’, include(‘ai.urls’)) ]

Create a tenant (Django shell)
from ai.models import Tenant
import secrets
Tenant.objects.create(name=’My WP Site’, slug=’mysite’, jwt_secret=secrets.token_urlsafe(48), rate_limit_rpm=30, rate_limit_rpd=3000)

Issue a JWT (server-side script)
import jwt, time
slug=’mysite’; secret=’PASTE_TENANT_SECRET’
token = jwt.encode({‘sub’: slug, ‘exp’: int(time.time())+3600}, secret, algorithm=’HS256′)
print(token)

Test with curl
curl -X POST https://api.example.com/api/v1/chat
-H “Authorization: Bearer YOUR_JWT”
-H “Content-Type: application/json”
-d ‘{“provider”:”openai”,”model”:”gpt-4o-mini”,”messages”:[{“role”:”user”,”content”:”Hello”}]}’

WordPress minimal client (functions.php or small plugin)
function aigw_chat($prompt) {
$endpoint = ‘https://api.example.com/api/v1/chat’;
$token = getenv(‘AIGW_JWT’); // or store in wp_options securely
$body = array(
‘provider’ => ‘openai’,
‘model’ => ‘gpt-4o-mini’,
‘messages’ => array(
array(‘role’ => ‘system’, ‘content’ => ‘You are a helpful assistant.’),
array(‘role’ => ‘user’, ‘content’ => $prompt),
),
‘request_id’ => wp_generate_uuid4()
);
$resp = wp_remote_post($endpoint, array(
‘headers’ => array(‘Authorization’ => ‘Bearer ‘ . $token, ‘Content-Type’ => ‘application/json’),
‘body’ => wp_json_encode($body),
‘timeout’ => 30,
));
if (is_wp_error($resp)) return ‘Gateway error.’;
$code = wp_remote_retrieve_response_code($resp);
$json = json_decode(wp_remote_retrieve_body($resp), true);
if ($code !== 200) return ‘Error: ‘ . sanitize_text_field($json[‘error’] ?? ‘unknown’);
// Extract text depending on provider shape (example for OpenAI)
$msg = $json[‘data’][‘choices’][0][‘message’][‘content’] ?? ”;
return wp_kses_post($msg);
}

Add shortcode
add_shortcode(‘aigw’, function($atts, $content=”){
$prompt = $content ?: ($atts[‘q’] ?? ‘Say hi.’);
return aigw_chat($prompt);
});

Usage in WordPress
[aigw]Write a 1-sentence welcome message for new subscribers.[/aigw]

Optional: signed webhooks to WordPress
– For long tasks, post results back to WP with an HMAC header.
– WordPress verifies signature before saving.

Django webhook signer (utils)
import hmac, hashlib, base64, os
WEBHOOK_SECRET = os.environ.get(‘WEBHOOK_SECRET’,”)

def sign_body(body_bytes):
sig = hmac.new(WEBHOOK_SECRET.encode(), body_bytes, hashlib.sha256).digest()
return ‘sha256=’ + base64.b64encode(sig).decode()

WP verify webhook (endpoint handler)
$raw = file_get_contents(‘php://input’);
$sig = $_SERVER[‘HTTP_X_SIG’] ?? ”;
$calc = ‘sha256=’ . base64_encode(hash_hmac(‘sha256’, $raw, getenv(‘AIGW_WEBHOOK_SECRET’), true));
if (!hash_equals($calc, $sig)) { status_header(401); exit; }

Operational guidance
– Run Django with ASGI (uvicorn/daphne) behind Nginx. Enable HTTPS.
– Store provider keys and JWT secrets in env vars or a secrets manager.
– Use Redis for limits and request dedupe. Add a 60s idempotency key on request_id.
– Log all 4xx/5xx with request_id. Ship logs to your SIEM.
– Monitor cost by aggregating UsageEvent daily. Alert on anomalies.
– Backoff and failover: if provider_error, retry with alternate provider/model when safe.
– Implement per-tenant model allowlist if needed.

Performance tips
– Reuse HTTP clients when streaming; consider httpx.AsyncClient lifespan.
– Compress responses via Nginx gzip. Set reasonable timeouts.
– Cache static system prompts by hash to reduce payload size.
– For high volume, move UsageEvent writes to a queue (Celery) with buffered inserts.

Security checklist
– Per-tenant JWT secrets, short token TTLs.
– CORS locked to your WP origins.
– Validate model names against a whitelist.
– Strip PII if relaying user content to third parties where required.
– Rotate secrets regularly and audit admin access.

Next steps
– Add embeddings and RAG endpoints.
– Implement function/tool calling with a registry and execution sandbox.
– Expose batch endpoints with async job status and webhooks.

Queueing AI Content Jobs from WordPress to Django + Celery (OpenAI, Redis, HMAC, Callback)

This tutorial shows how to offload AI work from WordPress to a Python backend with Celery/Redis. WordPress signs a job request, Django verifies and enqueues, Celery calls OpenAI (or any compatible endpoint), then posts results back to WordPress.

What you’ll get
– A minimal WP plugin that sends signed jobs and receives callbacks
– A Django microservice with an HMAC-protected endpoint
– A Celery worker using Redis, with retries and rate limits
– A secure callback to update WP posts

Architecture
– WordPress (PHP): Button/cron triggers job -> signed POST -> Django /jobs
– Django API: Verify HMAC -> enqueue Celery task
– Celery + Redis: Run LLM call -> POST results to WP REST API
– WordPress: Update post or meta; show status

Prerequisites
– WordPress 6.x with Application Passwords enabled
– Python 3.11+, Django 5.x, Celery 5.x, Redis
– OpenAI (or compatible) API key
– Public HTTPS endpoints (ngrok for local) or reverse proxy

1) WordPress: minimal plugin to send jobs
Create wp-content/plugins/ai-job-queue/ai-job-queue.php

<?php
/**
* Plugin Name: AI Job Queue (Django)
* Description: Sends signed AI jobs to a Django queue and receives callbacks.
* Version: 0.1.0
*/
if (!defined('ABSPATH')) exit;

add_action('admin_menu', function () {
add_menu_page('AI Jobs', 'AI Jobs', 'edit_posts', 'ai-jobs', 'ai_jobs_page');
});

function ai_jobs_page() {
if (!current_user_can('edit_posts')) wp_die('No access');
if (isset($_POST['ai_enqueue']) && check_admin_referer('ai_jobs_nonce')) {
$post_id = intval($_POST['post_id']);
$resp = ai_send_job($post_id);
echo '

Queued: ‘ . esc_html($resp) . ‘

‘;
}
?>

AI Job Queue


$post_id,
‘title’ => $post->post_title,
‘content’ => wp_strip_all_tags($post->post_content),
‘callback_url’ => home_url(‘/wp-json/ai-jobs/v1/callback’),
‘timestamp’ => time(),
);

$endpoint = getenv(‘DJANGO_JOBS_URL’) ?: ‘https://django.example.com/jobs’;
$secret = getenv(‘AI_JOBS_HMAC_SECRET’);
$signature = base64_encode(hash_hmac(‘sha256’, wp_json_encode($body), $secret, true));

$resp = wp_remote_post($endpoint, array(
‘timeout’ => 15,
‘headers’ => array(
‘Content-Type’ => ‘application/json’,
‘X-AI-Signature’ => $signature,
),
‘body’ => wp_json_encode($body),
));
if (is_wp_error($resp)) return $resp->get_error_message();
return ‘OK’;
}

// REST callback to accept results
add_action(‘rest_api_init’, function () {
register_rest_route(‘ai-jobs/v1’, ‘/callback’, array(
‘methods’ => ‘POST’,
‘callback’ => ‘ai_jobs_callback’,
‘permission_callback’ => ‘__return_true’,
));
});

function ai_jobs_callback($request) {
$data = $request->get_json_params();
$post_id = intval($data[‘post_id’] ?? 0);
$summary = $data[‘summary’] ?? ”;
$status = $data[‘status’] ?? ‘unknown’;

if (!$post_id) return new WP_REST_Response(array(‘error’ => ‘missing post_id’), 400);
if ($status === ‘ok’) {
update_post_meta($post_id, ‘_ai_summary’, wp_kses_post($summary));
return array(‘ack’ => true);
}
update_post_meta($post_id, ‘_ai_error’, sanitize_text_field($data[‘error’] ?? ”));
return array(‘ack’ => true);
}

Notes
– Store AI_JOBS_HMAC_SECRET and DJANGO_JOBS_URL in wp-config.php:
putenv(‘AI_JOBS_HMAC_SECRET=CHANGEME_LONG_RANDOM’);
putenv(‘DJANGO_JOBS_URL=https://django.example.com/jobs’);

2) Django: API to receive signed jobs
Install
pip install django djangorestframework celery redis httpx python-dotenv

project/settings.py (key parts)
INSTALLED_APPS = [
‘django.contrib.contenttypes’,
‘rest_framework’,
‘jobs’,
]
ALLOWED_HOSTS = [‘*’]
CSRF_TRUSTED_ORIGINS = [‘https://your-wp-domain.com’]
AI_JOBS_HMAC_SECRET = os.getenv(‘AI_JOBS_HMAC_SECRET’, ‘CHANGEME’)
WP_CALLBACK_AUTH = os.getenv(‘WP_CALLBACK_AUTH’) # “user:app_password” base64 later
OPENAI_API_KEY = os.getenv(‘OPENAI_API_KEY’)
OPENAI_BASE_URL = os.getenv(‘OPENAI_BASE_URL’, ‘https://api.openai.com/v1’)

Celery (project/celery.py)
import os
from celery import Celery
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘project.settings’)
app = Celery(‘project’)
app.conf.update(
broker_url=os.getenv(‘REDIS_URL’, ‘redis://redis:6379/0’),
result_backend=os.getenv(‘REDIS_URL’, ‘redis://redis:6379/0’),
task_routes={‘jobs.tasks.*’: {‘queue’: ‘ai’}},
task_annotations={‘jobs.tasks.process_job’: {‘rate_limit’: ’30/m’}},
task_time_limit=60,
task_soft_time_limit=50,
broker_connection_retry_on_startup=True,
)
app.autodiscover_tasks()

jobs/apps.py
from django.apps import AppConfig
class JobsConfig(AppConfig):
name = ‘jobs’
def ready(self):
from . import signals # optional

jobs/urls.py
from django.urls import path
from .views import job_create
urlpatterns = [path(‘jobs’, job_create)]

project/urls.py
from django.urls import path, include
urlpatterns = [path(”, include(‘jobs.urls’))]

jobs/views.py
import base64, hmac, hashlib, json, os
from django.conf import settings
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .tasks import process_job

def verify_sig(body_bytes, sig_header):
mac = hmac.new(settings.AI_JOBS_HMAC_SECRET.encode(), body_bytes, hashlib.sha256).digest()
expected = base64.b64encode(mac).decode()
return hmac.compare_digest(expected, sig_header or ”)

@csrf_exempt
def job_create(request):
if request.method != ‘POST’:
return JsonResponse({‘error’: ‘method’}, status=405)
body = request.body
sig = request.headers.get(‘X-AI-Signature’, ”)
if not verify_sig(body, sig):
return JsonResponse({‘error’: ‘bad signature’}, status=401)
payload = json.loads(body.decode(‘utf-8’))
process_job.delay(payload)
return JsonResponse({‘queued’: True})

3) Celery task: call OpenAI and callback to WP
jobs/tasks.py
import os, httpx, asyncio
from celery import shared_task
from django.conf import settings

def make_prompt(title, content):
return f”Write a crisp 120-word executive summary for a blog post.nTitle: {title}nContent:n{content[:4000]}”

async def call_llm_async(prompt):
headers = {
‘Authorization’: f’Bearer {settings.OPENAI_API_KEY}’,
‘Content-Type’: ‘application/json’,
}
async with httpx.AsyncClient(base_url=settings.OPENAI_BASE_URL, timeout=30) as client:
r = await client.post(‘/chat/completions’, json={
‘model’: ‘gpt-4o-mini’,
‘messages’: [{‘role’: ‘user’, ‘content’: prompt}],
‘temperature’: 0.3,
}, headers=headers)
r.raise_for_status()
data = r.json()
return data[‘choices’][0][‘message’][‘content’].strip()

def wp_basic_auth():
# settings.WP_CALLBACK_AUTH should be “username:app_password”
import base64
token = base64.b64encode(settings.WP_CALLBACK_AUTH.encode()).decode()
return {‘Authorization’: f’Basic {token}’, ‘Content-Type’: ‘application/json’}

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={‘max_retries’: 5})
def process_job(self, payload):
post_id = payload[‘post_id’]
callback = payload[‘callback_url’]
prompt = make_prompt(payload[‘title’], payload[‘content’])
try:
summary = asyncio.run(call_llm_async(prompt))
data = {‘post_id’: post_id, ‘status’: ‘ok’, ‘summary’: summary}
except Exception as e:
data = {‘post_id’: post_id, ‘status’: ‘error’, ‘error’: str(e)[:300]}
headers = wp_basic_auth()
with httpx.Client(timeout=15) as client:
client.post(callback, json=data, headers=headers)

4) WordPress REST auth for callbacks
– Create an Application Password for a user with edit_posts.
– Store “username:app_password” in Django env var WP_CALLBACK_AUTH.

5) Redis, Celery, Django with Docker Compose
docker-compose.yml
version: ‘3.8’
services:
redis:
image: redis:7-alpine
ports: [‘6379:6379’]
web:
build: .
command: gunicorn project.wsgi:application -b 0.0.0.0:8000
environment:
– AI_JOBS_HMAC_SECRET=CHANGEME_LONG_RANDOM
– OPENAI_API_KEY=${OPENAI_API_KEY}
– OPENAI_BASE_URL=https://api.openai.com/v1
– WP_CALLBACK_AUTH=${WP_CALLBACK_AUTH}
– REDIS_URL=redis://redis:6379/0
depends_on: [redis]
ports: [‘8000:8000’]
worker:
build: .
command: celery -A project.celery.app worker -Q ai –loglevel=INFO
environment:
– AI_JOBS_HMAC_SECRET=CHANGEME_LONG_RANDOM
– OPENAI_API_KEY=${OPENAI_API_KEY}
– OPENAI_BASE_URL=https://api.openai.com/v1
– WP_CALLBACK_AUTH=${WP_CALLBACK_AUTH}
– REDIS_URL=redis://redis:6379/0
depends_on: [redis, web]

Dockerfile (for Django)
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1
EXPOSE 8000

requirements.txt
django
djangorestframework
celery
redis
httpx
gunicorn
python-dotenv

6) Security hardening
– Rotate AI_JOBS_HMAC_SECRET regularly; use at least 32 random bytes.
– Enforce HTTPS and set Django ALLOWED_HOSTS and CSRF_TRUSTED_ORIGINS.
– IP allowlist WordPress -> Django if possible.
– Validate content length; cap payload sizes (e.g., Nginx client_max_body_size).
– Least-privilege WP user for Application Password.
– Log job IDs and outcomes; redact secrets.

7) Testing quickly
– Local tunnel for Django (ngrok http 8000) and set DJANGO_JOBS_URL to that HTTPS URL.
– In WP admin -> AI Jobs, enter a Post ID and click Generate Summary.
– Confirm _ai_summary meta is populated.

8) Operations tips
– Add a “Job status” meta box in WP reading a status endpoint from Django.
– Use Celery beat for rate windows; add cache-based circuit breaker for API failures.
– Implement idempotency: include a job_id and ignore duplicate callbacks.

That’s it. You now have a production-ready pattern to move AI work off WordPress, with signatures, queueing, and a clean callback loop.

Production-Ready AI Document Pipeline: WordPress Uploads → Django Workers → Summaries Back to WP

Overview
We’ll implement an end-to-end pipeline:
– User uploads a document to WordPress Media.
– WordPress offloads the file to S3, sends a job to a Django API.
– Django downloads the file, runs OCR (if needed), chunks text, gets an LLM summary, and stores artifacts.
– Django posts results back to WordPress via a signed callback.
– WordPress saves a draft “Document Summary” post with structured fields.

Why this design
– Decoupled and fault-tolerant (Celery workers).
– Scales horizontally.
– Auditable (job records + signed callbacks).
– No long-running PHP on WordPress.

Architecture
– WordPress: tiny plugin + WP REST callback + S3 offload (plugin) + background initiation (Action Scheduler).
– Django: REST API (Django REST Framework), Celery workers, Redis broker, S3 storage, Tesseract OCR (optional), LLM provider (OpenAI or Anthropic).
– Security: HMAC-signed requests both directions, IP allowlist (optional), minimal scopes.

Prereqs
– WordPress 6.x, PHP 8.1+, Action Scheduler, WP Offload Media (or S3 Uploads).
– Django 5.x, Django REST Framework, Celery 5.x, Redis, boto3.
– S3 bucket + IAM with least-privilege.
– LLM provider API key.
– Domain + HTTPS on both services.

Step 1: WordPress plugin (initiate job + receive result)
Create wp-content/plugins/ai-doc-pipeline/ai-doc-pipeline.php

”,
‘wp_webhook_secret’ => wp_generate_password(32, false),
‘django_shared_secret’ => ”,
];
add_option(self::OPTION, $defaults);
$this->register_cpt();
flush_rewrite_rules();
}

public function register_cpt() {
register_post_type(self::CPT, [
‘label’ => ‘Document Summaries’,
‘public’ => false,
‘show_ui’ => true,
‘supports’ => [‘title’, ‘editor’, ‘custom-fields’],
]);
}

private function get_settings() {
return get_option(self::OPTION, []);
}

// On new media, enqueue a background action to notify Django
public function on_new_media($attachment_id) {
$mime = get_post_mime_type($attachment_id);
if (!preg_match(‘/(pdf|msword|officedocument|image|plain|rtf)/’, $mime)) return;

if (!class_exists(‘ActionScheduler’)) return; // require Action Scheduler
as_enqueue_async_action(‘aidoc_send_to_django’, [‘attachment_id’ => $attachment_id], ‘aidoc’);
}
}

new AIDocPipeline();

// Background action
add_action(‘aidoc_send_to_django’, function($attachment_id) {
$settings = get_option(AIDocPipeline::OPTION, []);
$endpoint = rtrim($settings[‘django_endpoint’] ?? ”, ‘/’);
$secret = $settings[‘django_shared_secret’] ?? ”;
if (!$endpoint || !$secret) return;

$url = wp_get_attachment_url($attachment_id);
if (!$url) return;

$payload = [
‘attachment_id’ => $attachment_id,
‘file_url’ => $url,
‘site’ => get_bloginfo(‘name’),
‘callback_url’ => rest_url(‘aidoc/v1/result’),
‘ts’ => time(),
];
$body = wp_json_encode($payload);
$sig = base64_encode(hash_hmac(‘sha256’, $body, $secret, true));

wp_remote_post($endpoint . ‘/api/v1/jobs’, [
‘headers’ => [
‘Content-Type’ => ‘application/json’,
‘X-AI-Signature’ => $sig,
],
‘body’ => $body,
‘timeout’ => 20,
]);
}, 10, 1);

// REST route to accept result
add_action(‘rest_api_init’, function() {
register_rest_route(‘aidoc/v1’, ‘/result’, [
‘methods’ => ‘POST’,
‘callback’ => function(WP_REST_Request $req) {
$settings = get_option(AIDocPipeline::OPTION, []);
$secret = $settings[‘wp_webhook_secret’] ?? ”;
$raw = $req->get_body();
$sig = $req->get_header(‘x-ai-signature’) ?? ”;
$ts = intval($req->get_header(‘x-ai-timestamp’) ?? ‘0’);

if (abs(time() – $ts) > 300) return new WP_Error(‘stale’, ‘Stale request’, [‘status’ => 401]);

$calc = base64_encode(hash_hmac(‘sha256’, $raw, $secret, true));
if (!hash_equals($calc, $sig)) return new WP_Error(‘bad_sig’, ‘Invalid signature’, [‘status’ => 401]);

$data = json_decode($raw, true);
if (!$data) return new WP_Error(‘bad_json’, ‘Invalid JSON’, [‘status’ => 400]);

// Create or update summary post
$post_id = wp_insert_post([
‘post_type’ => AIDocPipeline::CPT,
‘post_status’ => ‘draft’,
‘post_title’ => sanitize_text_field($data[‘title’] ?? ‘Untitled Summary’),
‘post_content’ => wp_kses_post($data[‘summary_markdown’] ?? ”),
]);

if ($post_id && !is_wp_error($post_id)) {
update_post_meta($post_id, ‘_source_attachment_id’, intval($data[‘attachment_id’] ?? 0));
update_post_meta($post_id, ‘_word_count’, intval($data[‘word_count’] ?? 0));
update_post_meta($post_id, ‘_processing_ms’, intval($data[‘processing_ms’] ?? 0));
update_post_meta($post_id, ‘_chunks’, intval($data[‘chunks’] ?? 0));
update_post_meta($post_id, ‘_s3_text_key’, sanitize_text_field($data[‘s3_text_key’] ?? ”));
update_post_meta($post_id, ‘_job_id’, sanitize_text_field($data[‘job_id’] ?? ”));
}

return [‘ok’ => true, ‘post_id’ => $post_id];
},
‘permission_callback’ => ‘__return_true’,
]);
});

Notes:
– Install “Action Scheduler” plugin.
– Configure settings via wp option update or a small admin page (omitted for brevity).
– Ensure media offloading to S3 is active so Django can fetch public or presigned URLs.

Step 2: Django project setup
Create a Django project with DRF and Celery.

pip install django djangorestframework celery redis boto3 pydantic python-dotenv httpx pypdf pytesseract Pillow pdfminer.six fastembed

settings.py (key lines)
– Add rest_framework to INSTALLED_APPS.
– Configure Redis URL and Celery.
– Store secrets via env.

CELERY_BROKER_URL = env(“REDIS_URL”)
CELERY_RESULT_BACKEND = env(“REDIS_URL”)
AI_SHARED_SECRET = env(“AI_SHARED_SECRET”) # matches WordPress django_shared_secret
WP_CALLBACK_SECRET = env(“WP_CALLBACK_SECRET”) # matches WordPress wp_webhook_secret
WP_ALLOWED_CALLBACKS = env.list(“WP_ALLOWED_CALLBACKS”, default=[]) # optional IPs

S3 config:
AWS_ACCESS_KEY_ID = env(“AWS_ACCESS_KEY_ID”)
AWS_SECRET_ACCESS_KEY = env(“AWS_SECRET_ACCESS_KEY”)
AWS_STORAGE_BUCKET_NAME = env(“AWS_STORAGE_BUCKET_NAME”)
AWS_S3_REGION_NAME = env(“AWS_S3_REGION_NAME”)

celery.py
from __future__ import annotations
import os
from celery import Celery

os.environ.setdefault(“DJANGO_SETTINGS_MODULE”, “core.settings”)
app = Celery(“core”)
app.config_from_object(“django.conf:settings”, namespace=”CELERY”)
app.autodiscover_tasks()

urls.py
from django.urls import path
from api.views import JobView

urlpatterns = [path(“api/v1/jobs”, JobView.as_view())]

Step 3: Django API view (verify HMAC and enqueue task)
api/views.py
import base64, hmac, hashlib, time, uuid, json
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.conf import settings
from .tasks import process_document

def verify_sig(raw: bytes, sig: str, secret: str) -> bool:
calc = base64.b64encode(hmac.new(secret.encode(), raw, hashlib.sha256).digest()).decode()
return hmac.compare_digest(calc, sig or “”)

class JobView(APIView):
authentication_classes = []
permission_classes = []

def post(self, request):
raw = request.body
sig = request.headers.get(“X-AI-Signature”, “”)
if not verify_sig(raw, sig, settings.AI_SHARED_SECRET):
return Response({“error”: “bad signature”}, status=status.HTTP_401_UNAUTHORIZED)

data = request.data
job_id = str(uuid.uuid4())
process_document.delay(job_id, data)
return Response({“ok”: True, “job_id”: job_id})

Step 4: Celery task (download, OCR/text, chunk, summarize, store, callback)
api/tasks.py
import time, io, os, base64, hmac, hashlib, json
import httpx, boto3
from django.conf import settings
from celery import shared_task
from pydantic import BaseModel
from pdfminer.high_level import extract_text as pdf_text
from pypdf import PdfReader
from PIL import Image
import pytesseract

class JobPayload(BaseModel):
attachment_id: int
file_url: str
site: str
callback_url: str
ts: int

def hmac_b64(secret: str, body: bytes) -> str:
import hashlib, hmac, base64
return base64.b64encode(hmac.new(secret.encode(), body, hashlib.sha256).digest()).decode()

def extract_from_pdf(data: bytes) -> str:
try:
return pdf_text(io.BytesIO(data))
except Exception:
# Fallback OCR on each page if text layer missing
txt = []
reader = PdfReader(io.BytesIO(data))
for page in reader.pages:
# Basic rasterization approach assumed; for production use a PDF to image lib
pass
return “n”.join(txt)

def extract_from_image(data: bytes) -> str:
img = Image.open(io.BytesIO(data)).convert(“RGB”)
return pytesseract.image_to_string(img)

async def fetch_bytes(url: str) -> bytes:
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
r = await client.get(url)
r.raise_for_status()
return r.content

def chunk_text(text: str, size=1200, overlap=150):
tokens = text.split()
out = []
i = 0
while i str:
# Replace with your provider; example uses OpenAI responses API compatible call via httpx
api_key = os.environ.get(“OPENAI_API_KEY”)
headers = {“Authorization”: f”Bearer {api_key}”, “Content-Type”: “application/json”}
system = “You are a technical summarizer. Produce a concise, structured summary with headings and bullet points. Keep code blocks when useful.”
all_sections = []
async with httpx.AsyncClient(timeout=60.0) as client:
for idx, ch in enumerate(chunks):
payload = {
“model”: “gpt-4o-mini”,
“input”: [
{“role”:”system”,”content”:system},
{“role”:”user”,”content”:f”Part {idx+1}/{len(chunks)}:n{ch}nSummarize key points.”}
]
}
r = await client.post(“https://api.openai.com/v1/responses”, json=payload, headers=headers)
r.raise_for_status()
txt = r.json().get(“output_text”,””).strip()
all_sections.append(txt)
# Final pass
final_prompt = “Combine these partial summaries into one cohesive markdown summary with sections, bullets, and a short executive overview:n” + “nn”.join(all_sections)
payload = {“model”:”gpt-4o-mini”,”input”:[{“role”:”user”,”content”:final_prompt}]}
r = await client.post(“https://api.openai.com/v1/responses”, json=payload, headers=headers)
r.raise_for_status()
return r.json().get(“output_text”,””)

@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def process_document(self, job_id: str, data: dict):
t0 = time.time()
payload = JobPayload(**data)
# 1) Download file
content = httpx.run(fetch_bytes(payload.file_url))
# 2) Extract text
text = “”
if any(payload.file_url.lower().endswith(x) for x in [“.pdf”]):
text = extract_from_pdf(content)
elif any(payload.file_url.lower().endswith(x) for x in [“.png”,”.jpg”,”.jpeg”,”.tif”,”.tiff”,”.webp”]):
text = extract_from_image(content)
else:
try:
text = content.decode(“utf-8″, errors=”ignore”)
except Exception:
text = “”
text = (text or “”).strip()
if not text:
raise self.retry(exc=RuntimeError(“No text extracted”))

# 3) Chunk + summarize
chunks = chunk_text(text)
summary_md = httpx.run(llm_summarize(chunks))

# 4) Store raw text to S3
s3 = boto3.client(“s3″, region_name=settings.AWS_S3_REGION_NAME,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY)
s3_key = f”processed/{job_id}/text.txt”
s3.put_object(Bucket=settings.AWS_STORAGE_BUCKET_NAME, Key=s3_key, Body=text.encode(“utf-8″), ContentType=”text/plain”)

# 5) Callback to WordPress (signed)
body = json.dumps({
“job_id”: job_id,
“attachment_id”: payload.attachment_id,
“title”: f”Summary of {payload.attachment_id}”,
“summary_markdown”: summary_md,
“word_count”: len(text.split()),
“processing_ms”: int((time.time() – t0)*1000),
“chunks”: len(chunks),
“s3_text_key”: s3_key
})
sig = hmac_b64(settings.WP_CALLBACK_SECRET, body.encode())
headers = {
“Content-Type”: “application/json”,
“X-AI-Signature”: sig,
“X-AI-Timestamp”: str(int(time.time()))
}

with httpx.Client(timeout=20.0) as client:
r = client.post(payload.callback_url, data=body, headers=headers)
r.raise_for_status()
return {“ok”: True, “job_id”: job_id}

Notes:
– httpx.run is a simple helper for running async coroutines in sync Celery; alternatively use asyncio.run().
– Replace the OpenAI call with your provider/model as needed.
– For production OCR on PDFs, integrate a robust rasterizer (e.g., pdf2image + poppler).

Step 5: Environment and run
.env (Django)
REDIS_URL=redis://localhost:6379/0
AI_SHARED_SECRET=replace-with-strong-random
WP_CALLBACK_SECRET=replace-with-strong-random
AWS_ACCESS_KEY_ID=…
AWS_SECRET_ACCESS_KEY=…
AWS_STORAGE_BUCKET_NAME=…
AWS_S3_REGION_NAME=us-west-2
OPENAI_API_KEY=…

WordPress options (match Django)
– django_shared_secret = AI_SHARED_SECRET
– wp_webhook_secret = WP_CALLBACK_SECRET
– django_endpoint = https://api.your-domain.com

Run
– Redis: redis-server
– Django: python manage.py runserver 0.0.0.0:8000 (behind nginx/gunicorn in prod)
– Celery: celery -A core.celery:app worker -l info -Q default -c 4

Step 6: Security hardening
– Only accept signed requests. Rotate secrets periodically.
– Validate file types and size limits on WordPress.
– Consider presigned S3 URLs instead of public media.
– Add IP allowlists on the WordPress callback route at the web server level.
– Enforce 5-minute freshness on timestamps (already in WP receiver).

Step 7: Observability
– Add job table in Django to persist status, errors, and durations.
– Emit structured logs (JSON) and correlate by job_id.
– Track cost: store chunk counts and prompt tokens if your provider exposes them.

Step 8: Performance tips
– Use batch Celery queues for large backlogs; set concurrency based on CPU.
– Cache OCR results for identical files (hash by SHA256).
– Use streaming uploads to S3; avoid loading large files fully in memory.
– Switch to embeddings + extractive summarization for very long docs to reduce LLM cost.

Minimal test with curl
– Simulate WordPress → Django:
curl -X POST https://api.your-domain.com/api/v1/jobs
-H “Content-Type: application/json”
-H “X-AI-Signature: $(printf ‘%s’ ‘{“attachment_id”:1,”file_url”:”https://…/sample.pdf”,”site”:”Test”,”callback_url”:”https://wp.site.com/wp-json/aidoc/v1/result”,”ts”:123}’ | openssl dgst -sha256 -hmac ‘AI_SHARED_SECRET’ -binary | openssl base64)”
-d ‘{“attachment_id”:1,”file_url”:”https://…/sample.pdf”,”site”:”Test”,”callback_url”:”https://wp.site.com/wp-json/aidoc/v1/result”,”ts”:123}’

– Simulate Django → WordPress:
curl -X POST https://wp.site.com/wp-json/aidoc/v1/result
-H “Content-Type: application/json”
-H “X-AI-Timestamp: 1700000000”
-H “X-AI-Signature: $(printf ‘%s’ ‘{“job_id”:”1″,”attachment_id”:1,”title”:”t”,”summary_markdown”:”s”}’ | openssl dgst -sha256 -hmac ‘WP_CALLBACK_SECRET’ -binary | openssl base64)”
-d ‘{“job_id”:”1″,”attachment_id”:1,”title”:”t”,”summary_markdown”:”s”}’

What you get
– Automated, resilient summarization pipeline.
– Clear security boundaries with HMAC.
– A reusable pattern for any long-running AI task initiated from WordPress.

Build a Production RAG API for WordPress Content with Django + pgvector + OpenAI

Overview
We’ll build a small, production-ready RAG service that:
– Pulls WordPress posts via REST
– Cleans, chunks, and embeds content with OpenAI
– Stores vectors in Postgres using pgvector
– Serves /search and /ask endpoints via Django
– Caches responses in Redis and rate-limits clients
– Ships with Docker for easy deploy

Stack
– Python 3.11, Django 5, DRF
– Postgres 16 + pgvector
– Redis (cache/rate limit)
– OpenAI embeddings + responses
– Docker + Gunicorn

Architecture
– Ingest worker: WP REST -> Clean -> Chunk -> Embed -> Upsert to Postgres
– API service:
– /search: vector similarity over pgvector
– /ask: retrieve top chunks -> prompt -> LLM -> cache
– Redis: cache answers and throttle
– Postgres: source of truth for docs/chunks
– Reverse proxy/CDN: Cloudflare (optional)

1) Create database with pgvector
SQL (run once):
CREATE EXTENSION IF NOT EXISTS vector;

— documents table
CREATE TABLE documents (
id UUID PRIMARY KEY,
wp_id INTEGER UNIQUE,
slug TEXT,
title TEXT,
url TEXT,
updated_at TIMESTAMPTZ,
checksum TEXT
);

— chunks table
CREATE TABLE chunks (
id UUID PRIMARY KEY,
doc_id UUID REFERENCES documents(id) ON DELETE CASCADE,
ord INTEGER,
content TEXT,
embedding vector(3072), — adjust to your embedding dimension
token_count INTEGER
);

— index for fast ANN search
CREATE INDEX ON chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

2) Django project setup
pip install django djangorestframework psycopg[binary,pool] redis python-dotenv pydantic openai tiktoken html2text

settings.py (key parts):
– Configure DATABASES for Postgres.
– Configure CACHES for Redis.
– Add REST_FRAMEWORK.
– Load OPENAI_API_KEY from env.
– Add simple header auth.

Example env:
OPENAI_API_KEY=sk-…
DATABASE_URL=postgresql://app:pass@db:5432/app
REDIS_URL=redis://redis:6379/0
ALLOWED_HOSTS=*

3) Models (lightweight)
We’ll use raw SQL for vector ops; Django models stay simple.

app/models.py:
from django.db import models
import uuid

class Document(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
wp_id = models.IntegerField(unique=True)
slug = models.TextField()
title = models.TextField()
url = models.TextField()
updated_at = models.DateTimeField()
checksum = models.TextField()

class Chunk(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
doc = models.ForeignKey(Document, on_delete=models.CASCADE)
ord = models.IntegerField()
content = models.TextField()
embedding = models.BinaryField(null=True) # store in pgvector via SQL, not ORM
token_count = models.IntegerField()

Note: We’ll write embeddings via raw SQL to the vector column.

4) WordPress ingestion
– Pull posts from /wp-json/wp/v2/posts
– Clean HTML -> text
– Chunk ~800 tokens with 100-token overlap
– Embed with text-embedding-3-large (3072 dims) or -small (1536 dims)
– Upsert documents and chunks

ingest.py:
import os, math, hashlib, time, uuid, requests
from datetime import datetime, timezone
import html2text
import tiktoken
import psycopg
from openai import OpenAI

WP_URL = os.getenv(“WP_URL”) # e.g., https://your-site.com
OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
DATABASE_URL = os.getenv(“DATABASE_URL”)

EMBED_MODEL = “text-embedding-3-large” # dimension 3072
CHUNK_TOKENS = 800
OVERLAP = 100

client = OpenAI(api_key=OPENAI_API_KEY)
tokenizer = tiktoken.get_encoding(“cl100k_base”)
h2t = html2text.HTML2Text()
h2t.ignore_links = False
h2t.body_width = 0

def fetch_posts():
page = 1
posts = []
while True:
r = requests.get(f”{WP_URL}/wp-json/wp/v2/posts”, params={“per_page”: 100, “page”: page}, timeout=30)
if r.status_code == 400 and “rest_post_invalid_page_number” in r.text:
break
r.raise_for_status()
batch = r.json()
if not batch:
break
posts.extend(batch)
page += 1
return posts

def clean_text(html, title):
text = h2t.handle(html or “”)
full = f”# {title}nn{text}”.strip()
return ” “.join(full.split())

def chunk_text(txt):
ids = tokenizer.encode(txt)
chunks = []
start = 0
ord_i = 0
while start start else end
return chunks

def sha1(s): return hashlib.sha1(s.encode(“utf-8”)).hexdigest()

def embed_texts(texts):
resp = client.embeddings.create(model=EMBED_MODEL, input=texts)
return [e.embedding for e in resp.data]

def upsert(conn, post, chunks, embeddings):
wp_id = post[“id”]
slug = post[“slug”]
title = post[“title”][“rendered”]
url = post[“link”]
updated = datetime.fromisoformat(post[“modified_gmt”] + “+00:00”)
checksum = sha1(post[“content”][“rendered”])

cur = conn.cursor()
cur.execute(“””
INSERT INTO documents (id, wp_id, slug, title, url, updated_at, checksum)
VALUES (%s,%s,%s,%s,%s,%s,%s)
ON CONFLICT (wp_id) DO UPDATE
SET slug=EXCLUDED.slug, title=EXCLUDED.title, url=EXCLUDED.url,
updated_at=EXCLUDED.updated_at, checksum=EXCLUDED.checksum
RETURNING id, checksum
“””, (uuid.uuid4(), wp_id, slug, title, url, updated, checksum))
doc_id, db_checksum = cur.fetchone()

# If content unchanged, skip re-chunk
if db_checksum == checksum:
conn.commit()
return

# Replace chunks for this doc
cur.execute(“DELETE FROM chunks WHERE doc_id=%s”, (doc_id,))
for (ord_i, content), emb in zip(chunks, embeddings):
cur.execute(“””
INSERT INTO chunks (id, doc_id, ord, content, embedding, token_count)
VALUES (%s,%s,%s,%s,%s,%s)
“””, (uuid.uuid4(), doc_id, ord_i, content, emb, len(tokenizer.encode(content))))
conn.commit()

def run():
posts = fetch_posts()
with psycopg.connect(DATABASE_URL) as conn:
for p in posts:
title = p[“title”][“rendered”]
txt = clean_text(p[“content”][“rendered”], title)
chunks = chunk_text(txt)
# Batch embed for throughput
texts = [c[1] for c in chunks]
for i in range(0, len(texts), 64):
batch = texts[i:i+64]
embs = embed_texts(batch)
upsert(conn, p, list(enumerate(batch, start=i)), embs)
time.sleep(0.2)

if __name__ == “__main__”:
run()

5) Vector search (pgvector SQL)
We’ll use cosine distance. Note: vector dimension must match the model.

search.py (utility):
import psycopg, os
DATABASE_URL = os.getenv(“DATABASE_URL”)

def top_k(query_emb, k=8):
with psycopg.connect(DATABASE_URL) as conn:
cur = conn.cursor()
cur.execute(“””
SELECT c.content, d.title, d.url, 1 – (c.embedding %s::vector) AS score
FROM chunks c
JOIN documents d ON d.id = c.doc_id
ORDER BY c.embedding %s::vector
LIMIT %s
“””, (query_emb, query_emb, k))
rows = cur.fetchall()
return [{“content”: r[0], “title”: r[1], “url”: r[2], “score”: float(r[3])} for r in rows]

6) Django API endpoints
– Header auth: X-API-Key
– /search: embed query -> vector search
– /ask: retrieve top chunks -> prompt LLM -> cache

auth.py (middleware):
from django.http import JsonResponse
import os, time, redis, hashlib

API_KEY = os.getenv(“API_KEY”)
r = redis.from_url(os.getenv(“REDIS_URL”, “redis://localhost:6379/0”))

class SimpleAuthRateLimit:
def __init__(self, get_response): self.get_response = get_response
def __call__(self, request):
key = request.headers.get(“X-API-Key”)
if not key or key != API_KEY:
return JsonResponse({“error”:”unauthorized”}, status=401)
# rate limit: 60 req/min per key
bucket = f”rl:{key}:{int(time.time()//60)}”
cnt = r.incr(bucket)
if cnt == 1: r.expire(bucket, 70)
if cnt > 60: return JsonResponse({“error”:”rate_limited”}, status=429)
return self.get_response(request)

settings.py: add middleware path to MIDDLEWARE.

views.py:
import os, json, hashlib
from django.views.decorators.csrf import csrf_exempt
from django.http import JsonResponse
from openai import OpenAI
from .search import top_k
import redis

client = OpenAI(api_key=os.getenv(“OPENAI_API_KEY”))
cache = redis.from_url(os.getenv(“REDIS_URL”))

def embed(q):
return client.embeddings.create(model=”text-embedding-3-large”, input=q).data[0].embedding

def answer_with_context(query, contexts):
system = “You are a concise assistant. Use only the provided context. Cite URLs when relevant.”
ctx = “nn—nn”.join([c[“content”][:1200] for c in contexts[:6]])
prompt = f”Context:n{ctx}nnQuestion: {query}nAnswer concisely. Include source titles and URLs if helpful.”
resp = client.chat.completions.create(
model=”gpt-4o-mini”,
messages=[{“role”:”system”,”content”:system},{“role”:”user”,”content”:prompt}],
temperature=0.2,
)
return resp.choices[0].message.content

@csrf_exempt
def search_view(request):
if request.method != “POST”:
return JsonResponse({“error”:”POST only”}, status=405)
body = json.loads(request.body.decode(“utf-8”))
q = body.get(“q”,””).strip()
if not q: return JsonResponse({“error”:”missing q”}, status=400)
emb = embed(q)
hits = top_k(emb, k=8)
return JsonResponse({“results”: hits})

@csrf_exempt
def ask_view(request):
if request.method != “POST”:
return JsonResponse({“error”:”POST only”}, status=405)
body = json.loads(request.body.decode(“utf-8”))
q = body.get(“q”,””).strip()
if not q: return JsonResponse({“error”:”missing q”}, status=400)
emb = embed(q)
hits = top_k(emb, k=8)
# cache key: query + top doc urls
key_src = q + “|” + “|”.join([h[“url”] for h in hits[:6]])
ck = “ans:” + hashlib.sha1(key_src.encode()).hexdigest()
cached = cache.get(ck)
if cached:
return JsonResponse({“answer”: cached.decode(“utf-8”), “cached”: True, “sources”: hits[:6]})
ans = answer_with_context(q, hits)
cache.setex(ck, 3600, ans)
return JsonResponse({“answer”: ans, “cached”: False, “sources”: hits[:6]})

urls.py:
from django.urls import path
from .views import search_view, ask_view
urlpatterns = [
path(“search”, search_view),
path(“ask”, ask_view),
]

7) Dockerize
Dockerfile:
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install -y build-essential libpq-dev && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install –no-cache-dir -r requirements.txt
COPY . .
CMD [“gunicorn”, “project.wsgi:application”, “-w”, “3”, “-b”, “0.0.0.0:8000”, “–timeout”, “120”]

requirements.txt: versions matching your local install.

docker-compose.yml:
version: “3.9”
services:
web:
build: .
env_file: .env
ports: [“8000:8000”]
depends_on: [db, redis]
db:
image: pgvector/pgvector:pg16
environment:
POSTGRES_DB: app
POSTGRES_USER: app
POSTGRES_PASSWORD: pass
volumes: [“pgdata:/var/lib/postgresql/data”]
redis:
image: redis:7
volumes:
pgdata:

8) Security and ops
– Put Cloudflare in front. Only expose 443 to web. Block /admin if not needed.
– Use header auth (X-API-Key). Rotate keys regularly.
– Limit model output tokens to control latency/cost.
– Add health endpoint that checks DB and Redis.
– Log query, latency, hit/miss (without storing PII).
– Back up Postgres nightly. Enable WAL archiving if needed.

9) Performance tuning
– Use text-embedding-3-small for lower cost; adjust vector dims to 1536.
– Increase ivfflat lists based on data size (rule of thumb: sqrt(N)).
– Warm Redis with popular Q/A.
– Batch embeddings (32–128) to maximize throughput.
– Add hybrid search (BM25 via pg_trgm) and rerank if needed.

10) WordPress sync strategies
– Full reingest nightly + incremental webhook:
– Use WordPress action on post save to call /ingest webhook (custom endpoint) with post ID.
– Ingest script fetches single post and reindexes only that document.
– Store checksum to avoid redundant work.

Quick test
– Run docker-compose up
– Ingest: docker compose exec web python ingest.py
– Search: curl -H “X-API-Key: YOURKEY” -X POST localhost:8000/search -d ‘{“q”:”your topic”}’ -H “Content-Type: application/json”
– Ask: curl -H “X-API-Key: YOURKEY” -X POST localhost:8000/ask -d ‘{“q”:”Summarize pricing”}’ -H “Content-Type: application/json”

What you get
– A clean RAG backend you can drop behind your site, chat widget, or internal tools.
– Real-time answers grounded in your WordPress content.
– Clear paths to scale, secure, and monitor.

Build a Production-Ready “AI Draft Writer” Pipeline: Django + Celery + OpenAI → WordPress REST

Overview We’ll build a small, production-ready service that generates blog drafts using OpenAI and publishes them to WordPress as drafts. The stack: – Django API for requests and auth – Celery + Redis for background jobs and retries – OpenAI for text generation – WordPress REST API for post creation You’ll get secure config, job orchestration, content sanitization, and deployment tips. Architecture – Client calls Django endpoint /api/drafts (JWT or token) with topic + options. – Django enqueues Celery task draft_writer.generate_draft. – Task calls OpenAI, sanitizes HTML/markdown, converts to WP blocks (optional), and posts draft to WordPress via wp-json/wp/v2/posts. – Task stores run metadata and handles retries with exponential backoff. – Optional callback/webhook to notify the client. Prerequisites – Python 3.10+, Django 4+, Celery 5+, Redis – WordPress 6.x with REST enabled – OpenAI API key – WordPress auth: Application Passwords (fast) or OAuth2 (recommended for production) Environment variables Add to your .env (use a vault/secret manager in prod): OPENAI_API_KEY=sk-… WP_BASE_URL=https://your-site.com WP_USER=automation-user WP_APP_PASSWORD=abcd xyz… (from WordPress Application Passwords) DJANGO_SECRET_KEY=… ALLOWED_HOSTS=your-api.yourdomain.com REDIS_URL=redis://localhost:6379/0 Django setup Create project and app django-admin startproject aidrafts cd aidrafts python manage.py startapp drafts Install packages pip install django djangorestframework python-dotenv requests openai==1.* celery redis bleach markdown settings.py (key parts) INSTALLED_APPS = [ “django.contrib.admin”, “django.contrib.auth”, “django.contrib.contenttypes”, “django.contrib.sessions”, “django.contrib.messages”, “django.contrib.staticfiles”, “rest_framework”, “drafts”, ] REST_FRAMEWORK = { “DEFAULT_AUTHENTICATION_CLASSES”: [ “rest_framework.authentication.TokenAuthentication”, ], “DEFAULT_PERMISSION_CLASSES”: [ “rest_framework.permissions.IsAuthenticated”, ], } CELERY_BROKER_URL = os.getenv(“REDIS_URL”) CELERY_RESULT_BACKEND = os.getenv(“REDIS_URL”) OpenAI client init (drafts/openai_client.py) import os from openai import OpenAI client = OpenAI(api_key=os.getenv(“OPENAI_API_KEY”)) def generate_article(topic: str, outline: list[str] | None = None, tone: str = “concise”, words: int = 900) -> dict: system = “You are a technical writer. Produce clean, factual content for a business/engineering audience.” user = f”Topic: {topic}nTone: {tone}nTarget length: {words} words.nInclude short intro, clear sections with H2/H3, bullets where useful, and a practical checklist.nAvoid hype.” if outline: user += “nFollow this outline:n” + “n”.join(f”- {h}” for h in outline) resp = client.chat.completions.create( model=”gpt-4o-mini”, messages=[{“role”:”system”,”content”:system},{“role”:”user”,”content”:user}], temperature=0.4 ) content = resp.choices[0].message.content return {“content”: content} Sanitization and format helpers (drafts/format.py) import bleach import markdown as md ALLOWED_TAGS = bleach.sanitizer.ALLOWED_TAGS.union({“p”,”h2″,”h3″,”h4″,”ul”,”ol”,”li”,”pre”,”code”,”strong”,”em”,”a”,”blockquote”}) ALLOWED_ATTRS = {“a”: [“href”,”title”,”rel”,”target”], “code”: [“class”]} def to_html(text: str) -> str: if text.strip().startswith(” str: plain = bleach.clean(html, tags=[], strip=True) return (plain[:chars] + “…”) if len(plain) > chars else plain WordPress client (drafts/wp_client.py) import os, base64, requests WP_BASE = os.getenv(“WP_BASE_URL”).rstrip(“/”) WP_USER = os.getenv(“WP_USER”) WP_APP_PASS = os.getenv(“WP_APP_PASSWORD”) def _auth_header(): token = base64.b64encode(f”{WP_USER}:{WP_APP_PASS}”.encode()).decode() return {“Authorization”: f”Basic {token}”} def create_draft(title: str, content_html: str, excerpt: str = “”, categories=None, tags=None) -> dict: url = f”{WP_BASE}/wp-json/wp/v2/posts” payload = { “title”: title, “content”: content_html, “excerpt”: excerpt, “status”: “draft”, } if categories: payload[“categories”] = categories if tags: payload[“tags”] = tags r = requests.post(url, json=payload, headers=_auth_header(), timeout=20) r.raise_for_status() return r.json() Models (drafts/models.py) from django.db import models from django.contrib.auth import get_user_model User = get_user_model() class DraftJob(models.Model): STATUS_CHOICES = [ (“queued”,”queued”), (“running”,”running”), (“succeeded”,”succeeded”), (“failed”,”failed”), ] user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True) topic = models.CharField(max_length=300) outline = models.JSONField(null=True, blank=True) tone = models.CharField(max_length=40, default=”concise”) words = models.PositiveIntegerField(default=900) wp_post_id = models.PositiveIntegerField(null=True, blank=True) status = models.CharField(max_length=16, choices=STATUS_CHOICES, default=”queued”) error = models.TextField(blank=True, default=””) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) Celery config (aidrafts/celery.py) import os from celery import Celery os.environ.setdefault(“DJANGO_SETTINGS_MODULE”, “aidrafts.settings”) app = Celery(“aidrafts”) app.config_from_object(“django.conf:settings”, namespace=”CELERY”) app.autodiscover_tasks() __init__.py in project (aidrafts/__init__.py) from .celery import app as celery_app __all__ = (“celery_app”,) Celery task (drafts/tasks.py) import time from celery import shared_task from django.db import transaction from .models import DraftJob from .openai_client import generate_article from .format import to_html, to_excerpt from .wp_client import create_draft @shared_task(bind=True, max_retries=3, default_retry_delay=30) def generate_draft(self, job_id: int): job = DraftJob.objects.get(id=job_id) try: with transaction.atomic(): job.status = “running” job.save(update_fields=[“status”,”updated_at”]) ai = generate_article(job.topic, job.outline, job.tone, job.words) html = to_html(ai[“content”]) excerpt = to_excerpt(html) wp = create_draft(title=job.topic, content_html=html, excerpt=excerpt) job.wp_post_id = wp.get(“id”) job.status = “succeeded” job.save(update_fields=[“wp_post_id”,”status”,”updated_at”]) return {“wp_post_id”: job.wp_post_id} except Exception as e: try: self.retry(exc=e, countdown=min(300, 30*(self.request.retries+1)**2)) except self.MaxRetriesExceededError: job.status = “failed” job.error = str(e) job.save(update_fields=[“status”,”error”,”updated_at”]) raise API views (drafts/views.py) from rest_framework import serializers, permissions, status from rest_framework.views import APIView from rest_framework.response import Response from .models import DraftJob from .tasks import generate_draft class DraftRequestSerializer(serializers.Serializer): topic = serializers.CharField(max_length=300) outline = serializers.ListField(child=serializers.CharField(), required=False) tone = serializers.CharField(required=False, default=”concise”) words = serializers.IntegerField(required=False, min_value=200, max_value=2000, default=900) class DraftJobSerializer(serializers.ModelSerializer): class Meta: model = DraftJob fields = [“id”,”topic”,”status”,”wp_post_id”,”error”,”created_at”,”updated_at”] class DraftsView(APIView): permission_classes = [permissions.IsAuthenticated] def post(self, request): s = DraftRequestSerializer(data=request.data) s.is_valid(raise_exception=True) job = DraftJob.objects.create(user=request.user, **s.validated_data) generate_draft.delay(job.id) return Response(DraftJobSerializer(job).data, status=status.HTTP_202_ACCEPTED) def get(self, request): jobs = DraftJob.objects.filter(user=request.user).order_by(“-created_at”)[:50] return Response(DraftJobSerializer(jobs, many=True).data) URLs (aidrafts/urls.py) from django.contrib import admin from django.urls import path from drafts.views import DraftsView urlpatterns = [ path(“admin/”, admin.site.urls), path(“api/drafts”, DraftsView.as_view()), ] Run services – Start Redis redis-server – Start Celery worker celery -A aidrafts worker -l info –concurrency=4 – Start Django python manage.py runserver 0.0.0.0:8000 WordPress setup – Create a dedicated user with Editor role (or Author). – Generate an Application Password in the user profile. Copy it once. – Ensure permalinks and REST are enabled. – Optional: lock down WP REST with application firewall rules for your IPs. Testing the flow 1) Create a user/token in Django (e.g., via DRF authtoken or your auth). 2) POST to /api/drafts with Authorization: Token . Example JSON: { “topic”: “Automating invoice data extraction with Django + DocAI”, “outline”: [“Overview”,”Pipeline design”,”Parsing edge cases”,”Deployment”], “tone”: “practical”, “words”: 1000 } 3) Check GET /api/drafts for status. When succeeded, log into WordPress and find the new draft. Production notes – Auth: Prefer OAuth2 or a service account bearer gateway to avoid Basic Auth in WP. If you must use Application Passwords, restrict capabilities and IPs. – Timeouts: OpenAI and WP requests use short timeouts; Celery handles retries with backoff. – Observability: Add request IDs, structure logs (JSON), and push Celery metrics to Prometheus. – Security: Store secrets in a secrets manager. Enforce HTTPS everywhere and CORS off by default. – Content safety: Apply a server-side allowlist/denylist scan before pushing to WP. – Rate limits: Celery queue per-tenant and add leaky-bucket throttling if many users. – Idempotency: Add a de-dup key on (user, topic hash) to prevent duplicate drafts. – Block editor: If you want block-level control, convert markdown to blocks server-side before posting. Simple block conversion example (optional) def md_to_blocks(html: str): # Minimal example: wrap as a single HTML block return f”{html}” Then call create_draft with content_html=md_to_blocks(html). Cleanup tasks (optional) – Nightly task to purge failed jobs older than 30 days. – Sync WP post status back to Django for analytics. Next steps – Add title generation separate from topic, with keyword constraints. – Add media support: upload images to WP media endpoint, then embed in content. – Add a review checklist to the draft footer, visible only in editor. You now have a robust, background-processed AI draft writer that safely publishes to WordPress as drafts and scales in production.

Build a Production AI Summary Service for WordPress with FastAPI (Secure, Rate‑Limited)

Overview
Goal: Generate post summaries in WordPress using a secure backend AI service. We’ll build:
– A FastAPI AI gateway that wraps OpenAI, enforces per-site keys, rate limits, and logs requests.
– A minimal WordPress plugin that adds a “Generate Summary” button in the editor, calling your server from PHP (not the browser).

Why this pattern
– No OpenAI keys in the browser.
– Central rate limiting, cost controls, and observability.
– Versionable, testable, swappable models.

Architecture
– WordPress (admin): Button → wp_ajax action (server-side) → FastAPI gateway → OpenAI API.
– FastAPI: Verifies X-Client-Key, applies rate limits, logs to SQLite/Postgres, proxies to OpenAI.
– Optional: Redis for tighter rate limits; Docker for deploys.

Prereqs
– Python 3.11+, Node not required.
– WordPress 6.3+ (Classic or Block editor).
– OpenAI API key stored only on the server.
– Optional: Docker, Redis.

1) FastAPI AI Gateway
Create project
– mkdir ai-gateway && cd ai-gateway
– python -m venv .venv && source .venv/bin/activate
– pip install fastapi uvicorn httpx pydantic python-dotenv slowapi sqlalchemy aiosqlite

.env (do not commit)
– OPENAI_API_KEY=sk-…
– CLIENT_KEYS=siteAkey123,siteBkey456
– MODEL=gpt-4o-mini
– LOG_DB=sqlite+aiosqlite:///./logs.db
– RATE_LIMIT=30/minute

main.py
– A secure proxy with per-site key auth, rate limit, logging, and basic cost tracking.

from fastapi import FastAPI, Header, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import httpx, os, time, json, asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker

OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
CLIENT_KEYS = set([k.strip() for k in os.getenv(“CLIENT_KEYS”,””).split(“,”) if k.strip()])
MODEL = os.getenv(“MODEL”,”gpt-4o-mini”)
LOG_DB = os.getenv(“LOG_DB”,”sqlite+aiosqlite:///./logs.db”)
RATE_LIMIT = os.getenv(“RATE_LIMIT”,”30/minute”)

limiter = Limiter(key_func=get_remote_address, default_limits=[RATE_LIMIT])
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, lambda r, e: ({“detail”:”rate_limited”}, 429))

app.add_middleware(
CORSMiddleware,
allow_origins=[],
allow_credentials=False,
allow_methods=[“POST”],
allow_headers=[“*”],
)

engine = create_async_engine(LOG_DB, future=True, echo=False)
SessionLocal = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

async def init_db():
async with engine.begin() as conn:
await conn.execute(text(“””
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts REAL,
client_key TEXT,
model TEXT,
prompt_tokens INTEGER,
completion_tokens INTEGER,
cost_usd REAL
)”””))

@app.on_event(“startup”)
async def on_startup():
await init_db()

class ChatMessage(BaseModel):
role: str
content: str

class ChatRequest(BaseModel):
messages: list[ChatMessage]
temperature: float | None = 0.3
model: str | None = None
max_tokens: int | None = 256

def estimate_cost(model:str, prompt_tokens:int, completion_tokens:int)->float:
# Simple placeholder; adjust with vendor pricing
return (prompt_tokens*0.000001 + completion_tokens*0.000002)

@app.post(“/v1/chat/completions”)
@limiter.limit(RATE_LIMIT)
async def chat_completions(payload: ChatRequest, x_client_key: str = Header(default=””)):
if not x_client_key or x_client_key not in CLIENT_KEYS:
raise HTTPException(status_code=401, detail=”invalid_client_key”)

model = payload.model or MODEL
body = {
“model”: model,
“messages”: [m.dict() for m in payload.messages],
“temperature”: payload.temperature,
“max_tokens”: payload.max_tokens
}

async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(
“https://api.openai.com/v1/chat/completions”,
headers={“Authorization”: f”Bearer {OPENAI_API_KEY}”},
json=body
)
if r.status_code >= 400:
raise HTTPException(status_code=r.status_code, detail=r.text)

data = r.json()
# basic token extraction (vendor-dependent)
usage = data.get(“usage”, {}) or {}
pt = usage.get(“prompt_tokens”, 0)
ct = usage.get(“completion_tokens”, 0)
cost = estimate_cost(model, pt, ct)

async with SessionLocal() as s:
await s.execute(
text(“INSERT INTO logs (ts, client_key, model, prompt_tokens, completion_tokens, cost_usd) VALUES (:ts,:ck,:m,:pt,:ct,:c)”),
{“ts”: time.time(), “ck”: x_client_key, “m”: model, “pt”: pt, “ct”: ct, “c”: cost}
)
await s.commit()

return data

Run locally
– uvicorn main:app –host 0.0.0.0 –port 8080

2) Dockerize (optional)
Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install –no-cache-dir -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERED=1
CMD [“uvicorn”,”main:app”,”–host”,”0.0.0.0″,”–port”,”8080″]

requirements.txt
fastapi
uvicorn
httpx
pydantic
python-dotenv
slowapi
sqlalchemy
aiosqlite

Build/run
– docker build -t ai-gateway:latest .
– docker run -p 8080:8080 –env-file .env ai-gateway:latest

3) WordPress plugin (admin-only button)
Create wp-content/plugins/ai-summary/ai-summary.php

ID, ‘_ai_summary’, true);
wp_nonce_field(‘ai_summary_action’, ‘ai_summary_nonce’);
echo ‘

‘;
echo ‘

‘;
?>

(function(){
document.getElementById(‘ai-generate-summary’).addEventListener(‘click’, function(e){
e.preventDefault();
const btn=this; btn.disabled=true; btn.textContent=’Generating…’;
const data = new FormData();
data.append(‘action’,’ai_generate_summary’);
data.append(‘post_id’,’ID); ?>’);
data.append(‘ai_summary_nonce’,”);
fetch(ajaxurl, { method:’POST’, credentials:’same-origin’, body:data })
.then(r=>r.json()).then(j=>{
if(j.success){ document.getElementById(‘ai_summary’).value=j.data.summary; }
else { alert(j.data || ‘Error’); }
}).catch(()=>alert(‘Request failed’)).finally(()=>{btn.disabled=false; btn.textContent=’Generate Summary’;});
});
})();

post_content);
$messages = [
[“role”=>”system”,”content”=>”You summarize WordPress posts for busy readers. 3-4 sentences, neutral tone.”],
[“role”=>”user”,”content”=>”Summarize:nn”.$content]
];

$body = json_encode([
“model” => null,
“temperature” => 0.2,
“max_tokens” => 220,
“messages” => $messages
]);

$args = [
‘headers’ => [
‘Content-Type’ => ‘application/json’,
‘X-Client-Key’ => $client_key
],
‘body’ => $body,
‘timeout’ => 30,
‘method’ => ‘POST’
];

$res = wp_remote_post($gateway_url, $args);
if (is_wp_error($res)) wp_send_json_error(‘Gateway error’, 500);

$code = wp_remote_retrieve_response_code($res);
$resp = json_decode(wp_remote_retrieve_body($res), true);

if ($code >= 400 || !isset($resp[‘choices’][0][‘message’][‘content’])) {
wp_send_json_error(‘AI error’, 500);
}

$summary = sanitize_textarea_field($resp[‘choices’][0][‘message’][‘content’]);
update_post_meta($post_id, ‘_ai_summary’, $summary);
wp_send_json_success([‘summary’=>$summary]);
});

4) Configure environment on WordPress host
– In wp-config.php (or server env), set:
– putenv(‘AI_GATEWAY_URL=https://your-api.example.com/v1/chat/completions’);
– putenv(‘AI_GATEWAY_CLIENT_KEY=siteAkey123’);
– Do not hardcode secrets in the plugin.
– Restrict ajax action to admins/editors via capability checks (already included).

5) Test locally
– Start FastAPI.
– In WP admin, edit a post → AI Summary box → Generate Summary.
– Check FastAPI logs table: sqlite file logs.db should show entries.

6) Production considerations
– HTTPS only. Put the gateway behind a CDN/WAF (IP allowlists optional).
– Rate limits per key via slowapi; adjust RATE_LIMIT env.
– Rotate CLIENT_KEYS regularly; store in secret manager.
– Logging: move from SQLite to Postgres (LOG_DB=postgresql+asyncpg://…).
– Add retry and circuit breaker to WordPress (e.g., increase timeout, handle 429 gracefully).
– Model control: pin model in env; add allowlist in gateway.
– Token/cost guardrails: enforce max_tokens and max prompt length in FastAPI.
– Observability: ship logs to OpenTelemetry/Prometheus; alert on spikes.

7) Optional enhancements
– Add /health and /usage endpoints to inspect quotas.
– Caching summaries by post hash to avoid re-billing unchanged content.
– Stream responses (server-sent events) to show progress in the admin UI.
– Add multilingual system prompts per site.

Quick smoke test (curl)
curl -s -X POST https://your-api.example.com/v1/chat/completions
-H “Content-Type: application/json”
-H “X-Client-Key: siteAkey123”
-d ‘{“messages”:[{“role”:”user”,”content”:”Say hi”}]}’ | jq .

You now have a secure, rate-limited AI summary workflow wired into WordPress with a backend you control.

Secure OpenAI Proxy for WordPress: FastAPI + HMAC + Rate Limiting (Step-by-Step)

Overview
– Goal: Call OpenAI from WordPress without storing API keys in WP.
– Architecture: WordPress plugin -> HTTPS -> FastAPI proxy -> OpenAI API.
– Security: HMAC-signed requests, short time window, rate limiting, CORS, Cloudflare.
– Outcomes: Streaming responses, reduced latency, auditability.

1) FastAPI Proxy (Python)
Create project structure:
.
├─ app/
│ ├─ main.py
│ ├─ security.py
│ ├─ limiter.py
│ └─ models.py
├─ requirements.txt
├─ .env
└─ Dockerfile

requirements.txt:
fastapi==0.115.4
uvicorn[standard]==0.30.6
httpx==0.27.2
pydantic==2.9.2
python-dotenv==1.0.1
redis==5.0.7
limits==3.13.0
slowapi==0.1.8
orjson==3.10.7

.env (do not commit):
OPENAI_API_KEY=sk-…
HMAC_SHARED_SECRET=super-long-random
ALLOWED_ORIGINS=https://example.com
RATE_LIMIT=60/minute
ALLOWED_DRIFT_SECONDS=30

app/models.py:
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any

class ChatMessage(BaseModel):
role: str
content: str

class ChatRequest(BaseModel):
model: str = Field(default=”gpt-4o-mini”)
messages: List[ChatMessage]
temperature: float = 0.2
max_tokens: Optional[int] = 512
stream: bool = True
metadata: Optional[Dict[str, Any]] = None

app/security.py:
import hmac, hashlib, time, os
from fastapi import HTTPException, Header

SHARED_SECRET = os.getenv(“HMAC_SHARED_SECRET”, “”)
ALLOWED_DRIFT = int(os.getenv(“ALLOWED_DRIFT_SECONDS”, “30”))

def verify_hmac_signature(body: bytes, x_signature: str, x_timestamp: str):
if not (x_signature and x_timestamp):
raise HTTPException(status_code=401, detail=”Missing signature headers”)
try:
ts = int(x_timestamp)
except:
raise HTTPException(status_code=401, detail=”Invalid timestamp”)
if abs(int(time.time()) – ts) > ALLOWED_DRIFT:
raise HTTPException(status_code=401, detail=”Stale request”)
mac = hmac.new(SHARED_SECRET.encode(), msg=(x_timestamp + “.”).encode() + body, digestmod=hashlib.sha256)
expected = “sha256=” + mac.hexdigest()
if not hmac.compare_digest(expected, x_signature):
raise HTTPException(status_code=401, detail=”Bad signature”)

app/limiter.py:
import os
from slowapi import Limiter
from slowapi.util import get_remote_address
from redis import Redis
from limits.storage import RedisStorage

redis_url = os.getenv(“REDIS_URL”, “redis://localhost:6379/0”)
storage = RedisStorage(redis_url)
limiter = Limiter(key_func=get_remote_address, storage_uri=redis_url)

app/main.py:
import os, httpx, orjson
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from .models import ChatRequest
from .security import verify_hmac_signature
from .limiter import limiter
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware

OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
ALLOWED_ORIGINS = os.getenv(“ALLOWED_ORIGINS”, “”).split(“,”)
RATE_LIMIT = os.getenv(“RATE_LIMIT”, “60/minute”)

app = FastAPI()
app.add_middleware(SlowAPIMiddleware)

if ALLOWED_ORIGINS and ALLOWED_ORIGINS[0]:
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in ALLOWED_ORIGINS],
allow_credentials=False,
allow_methods=[“POST”, “OPTIONS”],
allow_headers=[“*”],
)

@app.exception_handler(RateLimitExceeded)
def ratelimit_handler(request, exc):
return JSONResponse(status_code=429, content={“detail”:”Rate limit exceeded”})

@app.post(“/v1/chat/completions”)
@limiter.limit(RATE_LIMIT)
async def chat(request: Request):
raw = await request.body()
xsig = request.headers.get(“x-signature”)
xts = request.headers.get(“x-timestamp”)
verify_hmac_signature(raw, xsig, xts)

data = ChatRequest.model_validate_json(raw)
headers = {
“Authorization”: f”Bearer {OPENAI_API_KEY}”,
“Content-Type”: “application/json”,
}
payload = {
“model”: data.model,
“messages”: [m.model_dump() for m in data.messages],
“temperature”: data.temperature,
“max_tokens”: data.max_tokens,
“stream”: data.stream,
}

async def stream():
timeout = httpx.Timeout(30.0, read=60.0, write=30.0, connect=10.0)
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(“POST”, “https://api.openai.com/v1/chat/completions”, headers=headers, json=payload) as r:
async for chunk in r.aiter_bytes():
yield chunk

if data.stream:
return StreamingResponse(stream(), media_type=”text/event-stream”)
else:
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.post(“https://api.openai.com/v1/chat/completions”, headers=headers, json=payload)
return JSONResponse(status_code=r.status_code, content=r.json())

Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install –no-cache-dir -r requirements.txt
COPY app ./app
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD [“uvicorn”, “app.main:app”, “–host”, “0.0.0.0”, “–port”, “8000”]

Local run:
docker run –env-file .env -p 8000:8000 yourrepo/openai-proxy:latest

2) WordPress Plugin (HMAC + Admin UI)
Create wp-content/plugins/ai-secure-proxy/ai-secure-proxy.php:
<?php
/*
Plugin Name: AI Secure Proxy
Description: Calls a secured FastAPI proxy with HMAC-signed requests.
Version: 1.0.0
*/
if (!defined('ABSPATH')) exit;

class AISecureProxy {
const OPT_ENDPOINT = 'ai_proxy_endpoint';
const OPT_SECRET = 'ai_proxy_secret';

public function __construct() {
add_action('admin_menu', [$this,'menu']);
add_action('admin_init', [$this,'settings']);
add_action('wp_ajax_ai_proxy_chat', [$this,'handle_ajax']);
add_action('add_meta_boxes', [$this,'add_box']);
add_action('admin_enqueue_scripts', [$this,'enqueue']);
}

public function menu() {
add_options_page('AI Secure Proxy','AI Secure Proxy','manage_options','ai-secure-proxy',[$this,'render']);
}

public function settings() {
register_setting('ai-secure-proxy','ai_proxy_endpoint');
register_setting('ai-secure-proxy','ai_proxy_secret');
add_settings_section('ai-secure-proxy-sec','Settings',null,'ai-secure-proxy');
add_settings_field(self::OPT_ENDPOINT,'Proxy Endpoint',[$this,'field_endpoint'],'ai-secure-proxy','ai-secure-proxy-sec');
add_settings_field(self::OPT_SECRET,'Shared Secret',[$this,'field_secret'],'ai-secure-proxy','ai-secure-proxy-sec');
}

public function field_endpoint(){ $v=esc_attr(get_option(self::OPT_ENDPOINT,'')); echo "”; }
public function field_secret(){ $v=esc_attr(get_option(self::OPT_SECRET,”)); echo “”; }

public function render(){
echo “

AI Secure Proxy

“;
settings_fields(‘ai-secure-proxy’); do_settings_sections(‘ai-secure-proxy’); submit_button(); echo “

“;
}

public function add_box() {
add_meta_box(‘ai-secure-box’,’AI Assistant’,[$this,’box_html’],’post’,’side’);
}

public function box_html($post){
echo ““;
echo ““;
echo “


“;
wp_nonce_field(‘ai_secure_proxy’,’ai_secure_nonce’);
}

public function enqueue($hook){
if ($hook !== ‘post.php’ && $hook !== ‘post-new.php’) return;
wp_enqueue_script(‘ai-secure-proxy-js’, plugin_dir_url(__FILE__).’ui.js’, [‘jquery’], ‘1.0’, true);
wp_localize_script(‘ai-secure-proxy-js’,’AIProxy’,[
‘ajaxurl’=>admin_url(‘admin-ajax.php’),
‘nonce’=>wp_create_nonce(‘ai_secure_proxy’)
]);
}

private function sign($timestamp, $body, $secret){
return ‘sha256=’ . hash_hmac(‘sha256’, $timestamp . ‘.’ . $body, $secret);
}

public function handle_ajax(){
check_ajax_referer(‘ai_secure_proxy’,’nonce’);
if (!current_user_can(‘edit_posts’)) wp_send_json_error(‘forbidden’, 403);

$endpoint = get_option(self::OPT_ENDPOINT,”);
$secret = get_option(self::OPT_SECRET,”);
if (!$endpoint || !$secret) wp_send_json_error(‘not_configured’, 400);

$post_id = intval($_POST[‘post_id’] ?? 0);
$prompt = sanitize_text_field($_POST[‘prompt’] ?? ”);
$content = get_post_field(‘post_content’, $post_id);

$messages = [
[‘role’=>’system’,’content’=>’You are a concise assistant for WordPress editors.’],
[‘role’=>’user’,’content’=>”Post content:n”.$content.”nnTask:n”.$prompt]
];
$body = wp_json_encode([‘model’=>’gpt-4o-mini’,’messages’=>$messages,’temperature’=>0.2,’stream’=>false], JSON_UNESCAPED_SLASHES);
$ts = strval(time());
$sig = $this->sign($ts, $body, $secret);

$args = [
‘headers’=>[
‘Content-Type’=>’application/json’,
‘x-signature’=>$sig,
‘x-timestamp’=>$ts
],
‘body’=>$body,
‘timeout’=>45
];
$res = wp_remote_post($endpoint, $args);
if (is_wp_error($res)) wp_send_json_error($res->get_error_message(), 500);

$code = wp_remote_retrieve_response_code($res);
$data = json_decode(wp_remote_retrieve_body($res), true);
if ($code !== 200) wp_send_json_error($data[‘detail’] ?? ‘proxy_error’, $code);

$text = $data[‘choices’][0][‘message’][‘content’] ?? ”;
wp_send_json_success([‘text’=>$text]);
}
}
new AISecureProxy();

Create ui.js:
jQuery(function($){
$(‘#ai_generate’).on(‘click’, function(e){
e.preventDefault();
const prompt = $(‘#ai_prompt’).val();
$(‘#ai_output’).text(‘Generating…’);
$.post(AIProxy.ajaxurl, {
action: ‘ai_proxy_chat’,
nonce: AIProxy.nonce,
post_id: $(‘#post_ID’).val(),
prompt: prompt
}).done(function(res){
if (res.success) $(‘#ai_output’).text(res.data.text || ‘(no output)’);
else $(‘#ai_output’).text(‘Error: ‘ + (res.data || ‘unknown’));
}).fail(function(xhr){
$(‘#ai_output’).text(‘HTTP ‘ + xhr.status + ‘: ‘ + xhr.responseText);
});
});
});

3) Test the Proxy
– Start Redis locally if using docker compose or cloud service.
– curl test (replace values):
TS=$(date +%s); BODY='{“model”:”gpt-4o-mini”,”messages”:[{“role”:”user”,”content”:”Say hi”}],”stream”:false}’; SIG=’sha256=’$(printf “%s.%s” “$TS” “$BODY” | openssl dgst -sha256 -hmac ‘super-long-random’ -hex | sed ‘s/^.* //’); curl -i -X POST http://localhost:8000/v1/chat/completions -H “Content-Type: application/json” -H “x-timestamp: $TS” -H “x-signature: $SIG” -d “$BODY”

– In WordPress: set endpoint to your deployed URL, add the same shared secret, open a post, click Generate.

4) Deployment
– Run behind a reverse proxy (Nginx) with HTTP/2 and gzip on. Set client_max_body_size 512k.
– Put Cloudflare in front. Enable WAF, Bot Fight Mode, rate limiting rule for path /v1/chat/completions.
– Restrict origins via ALLOWED_ORIGINS. Optionally IP allowlist your WP server.
– Use Docker on a small VM (2 vCPU/2GB). Autoscale if RPS > 20 sustained.
– Rotate HMAC secret quarterly. Store in secrets manager.
– Enable health check endpoint (GET /) if desired.

5) Performance Notes
– Use stream=false in admin to keep the UI simple. For front-end UIs, enable stream=true and process SSE.
– Set temperature low for predictable drafts.
– Batch common system prompts to reduce tokens.
– Cache frequent prompts behind Redis keyed by normalized messages if costs matter.

6) Security Checklist
– Never put OpenAI keys in WordPress or client JS.
– Verify HMAC with timestamp drift window <= 30s.
– Enforce rate limits per IP and/or per site key.
– Limit models allowed by the proxy to a safe allowlist.
– Log request IDs, latency, and token counts (without storing content if sensitive).
– Keep dependencies patched and lock versions.

7) Optional: Nginx Snippet
server {
listen 443 ssl http2;
server_name proxy.example.com;
client_max_body_size 512k;

location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_read_timeout 65s;
}
}

Troubleshooting
– 401 stale request: check server time sync (use NTP).
– 401 bad signature: confirm same secret and exact body bytes.
– 429: raise RATE_LIMIT or verify Cloudflare rules.
– CORS errors: set ALLOWED_ORIGINS to your WP domain.

What to Build Next
– Add JWT per-editor scoping on WP side.
– Add content policy filter in proxy (regex/lint) before calling OpenAI.
– Add billing/usage metering per site.

Build a Production RAG API with Django + pgvector and a WordPress Shortcode Client

Overview
This tutorial shows how to implement a production-ready Retrieval-Augmented Generation (RAG) API using Django + Postgres (pgvector) and consume it from WordPress via a secure shortcode plugin. We’ll cover schema, ingestion, embeddings, retrieval, generation, auth, caching, and operational hardening.

What you’ll build
– Django service: /api/rag/query for answers with citations
– Postgres + pgvector for semantic search
– Background ingestion + batched embeddings
– OpenAI gpt-4o-mini (or any) for grounded responses
– WordPress plugin: [rag_ask] shortcode with a simple UI, JWT auth, and result caching

Prerequisites
– Python 3.10+, Django 4+
– Postgres 14+ with pgvector extension
– OpenAI API key
– WordPress 6+, admin access
– A domain with HTTPS

1) Provision Postgres with pgvector
Enable extension:
CREATE EXTENSION IF NOT EXISTS vector;

Recommend DB flags:
– shared_buffers: 25% RAM
– effective_cache_size: 50–75% RAM
– work_mem: 64–256MB
– maintenance_work_mem: 512MB+
– wal_compression = on

2) Django project setup
mkdir rag_service && cd rag_service
python -m venv .venv && source .venv/bin/activate
pip install django djangorestframework psycopg[binary,pool] pydantic openai==1.* numpy
django-admin startproject core .
python manage.py startapp rag

In core/settings.py
– Add rest_framework, rag
– Configure DATABASES for Postgres
– Set ALLOWED_HOSTS, CSRF_TRUSTED_ORIGINS
– Add a simple JWT secret for signed requests (e.g., RAG_JWT_SECRET in env)

3) Models and migrations
rag/models.py
from django.db import models

class Document(models.Model):
source_id = models.CharField(max_length=255, unique=True)
title = models.CharField(max_length=500)
url = models.URLField(blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)

class Chunk(models.Model):
document = models.ForeignKey(Document, on_delete=models.CASCADE, related_name=”chunks”)
ordinal = models.IntegerField()
text = models.TextField()
embedding = models.BinaryField() # store as float32 bytes
created_at = models.DateTimeField(auto_now_add=True)

Run:
python manage.py makemigrations
python manage.py migrate

Create embedding index
In psql:
— 1536 for text-embedding-3-large; adjust if using a different dimension
ALTER TABLE rag_chunk ADD COLUMN IF NOT EXISTS embedding_vec vector(1536);
— backfill computed column from binary only if you plan to duplicate; otherwise store directly as vector
CREATE INDEX IF NOT EXISTS idx_chunk_embedding_vec ON rag_chunk USING ivfflat (embedding_vec vector_cosine) WITH (lists = 100);

Note: In production, store embeddings directly into embedding_vec vector. Below code will do that.

4) Settings and env
.env
OPENAI_API_KEY=sk-…
RAG_JWT_SECRET=super-long-random
DJANGO_DEBUG=False

Load env in settings or via your process manager.

5) Embedding utilities
rag/embeddings.py
import os, numpy as np
from openai import OpenAI
from django.db import connection

EMBED_MODEL = “text-embedding-3-large”
_client = OpenAI(api_key=os.getenv(“OPENAI_API_KEY”))

def get_embedding(text: str) -> list[float]:
resp = _client.embeddings.create(model=EMBED_MODEL, input=text)
return resp.data[0].embedding

def insert_chunk_embedding(chunk_id: int, emb: list[float]):
# Write directly to vector column using psycopg adapt
with connection.cursor() as cur:
# pgvector accepts array literal ‘[]’ or parameterized vector
cur.execute(“UPDATE rag_chunk SET embedding_vec = %s WHERE id = %s”, (emb, chunk_id))

6) Ingestion and chunking
rag/ingest.py
import textwrap
from .models import Document, Chunk
from .embeddings import get_embedding, insert_chunk_embedding

def chunk_text(text: str, max_tokens=400):
# naive by characters; replace with tiktoken in production
size = 1600 # ~400 tokens
for i in range(0, len(text), size):
yield text[i:i+size]

def ingest_document(source_id: str, title: str, url: str|None, text: str):
doc, _ = Document.objects.get_or_create(source_id=source_id, defaults={“title”: title, “url”: url})
if doc.chunks.exists():
return doc
for idx, piece in enumerate(chunk_text(text)):
c = Chunk.objects.create(document=doc, ordinal=idx, text=piece)
emb = get_embedding(piece)
insert_chunk_embedding(c.id, emb)
return doc

7) Retrieval
rag/retrieval.py
from django.db import connection

def search_chunks(query_emb: list[float], k: int = 8):
with connection.cursor() as cur:
cur.execute(“””
SELECT c.id, c.text, d.title, d.url, 1 – (c.embedding_vec %s) AS score
FROM rag_chunk c
JOIN rag_document d ON c.document_id = d.id
ORDER BY c.embedding_vec %s
LIMIT %s
“””, (query_emb, query_emb, k))
rows = cur.fetchall()
return [{“id”: r[0], “text”: r[1], “title”: r[2], “url”: r[3], “score”: float(r[4])} for r in rows]

8) Generation
rag/generate.py
import os
from openai import OpenAI
_client = OpenAI(api_key=os.getenv(“OPENAI_API_KEY”))
GEN_MODEL = “gpt-4o-mini”

SYSTEM = “You are a factual assistant. Use provided context only. Cite sources by title and URL if present.”

def answer(query: str, contexts: list[dict]):
ctx_str = “nn—nn”.join([c[“text”] for c in contexts])
prompt = f”Question: {query}nnContext:n{ctx_str}nnInstructions:n- Answer concisely.n- If unsure, say you don’t know.n- Provide 2–4 citations with title and URL if available.”
resp = _client.chat.completions.create(
model=GEN_MODEL,
messages=[{“role”: “system”, “content”: SYSTEM},
{“role”: “user”, “content”: prompt}],
temperature=0.2,
)
return resp.choices[0].message.content

9) RAG API endpoint
rag/api.py
import os, time, hmac, hashlib, base64, json
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework import status
from .embeddings import get_embedding
from .retrieval import search_chunks
from .generate import answer

SECRET = os.getenv(“RAG_JWT_SECRET”, “”)

def verify_token(token: str) -> bool:
# token = base64url(header.payload).signature; simple HMAC for demo
try:
header_b64, payload_b64, sig_b64 = token.split(“.”)
signing_input = f”{header_b64}.{payload_b64}”.encode()
sig = base64.urlsafe_b64decode(sig_b64 + “===”)
expected = hmac.new(SECRET.encode(), signing_input, hashlib.sha256).digest()
return hmac.compare_digest(sig, expected)
except Exception:
return False

def parse_payload(token: str) -> dict:
payload_b64 = token.split(“.”)[1]
return json.loads(base64.urlsafe_b64decode(payload_b64 + “===”).decode())

@api_view([“POST”])
def rag_query(request):
try:
token = request.headers.get(“Authorization”, “”).replace(“Bearer “, “”)
if not token or not verify_token(token):
return Response({“error”: “unauthorized”}, status=status.HTTP_401_UNAUTHORIZED)
payload = parse_payload(token)
# Optional: enforce domain or nonce from payload
q = request.data.get(“q”, “”).strip()
if not q:
return Response({“error”: “missing q”}, status=status.HTTP_400_BAD_REQUEST)
q_emb = get_embedding(q)
hits = search_chunks(q_emb, k=8)
# take top 4 unique docs, dedupe by document title or URL
seen = set()
contexts = []
for h in hits:
key = h[“url”] or h[“title”]
if key in seen:
continue
seen.add(key)
contexts.append(h)
if len(contexts) >= 4:
break
content = answer(q, contexts)
citations = [{“title”: c[“title”], “url”: c[“url”], “score”: c[“score”]} for c in contexts]
return Response({“answer”: content, “citations”: citations})
except Exception as e:
return Response({“error”: “server_error”}, status=500)

core/urls.py
from django.urls import path
from rag.api import rag_query
urlpatterns = [path(“api/rag/query”, rag_query)]

10) Basic rate limiting and timeouts
– Put the Django app behind a reverse proxy (nginx) with:
– proxy_read_timeout 30s
– limit_req zone=rag burst=10 nodelay
– Use gunicorn with workers = 2 * cores, timeout = 60
– Consider django-ratelimit if needed.

11) Caching embeddings and answers
– Cache per (q normalized) for 5–30 minutes in Redis.
– Cache retrieval hits keyed by embedding hash for 1–5 minutes during spikes.

12) WordPress plugin (shortcode client)
Create wp-content/plugins/rag-client/rag-client.php
esc_url(get_option(‘rag_client_endpoint’, ”)),
], $atts);

ob_start(); ?>

(function(){
const box = document.currentScript.previousElementSibling;
const form = box.querySelector(‘.rag-form’);
const out = box.querySelector(‘.rag-result’);

async function signPayload() {
// Minimal HMAC via server to avoid exposing secret.
const r = await fetch(”, {credentials:’same-origin’});
if(!r.ok) throw new Error(‘token’);
return r.text();
}

form.addEventListener(‘submit’, async (e)=>{
e.preventDefault();
const q = new FormData(form).get(‘q’);
out.textContent = ‘Thinking…’;
try {
const token = await signPayload();
const r = await fetch(”, {
method:’POST’,
headers:{
‘Content-Type’:’application/json’,
‘Authorization’:’Bearer ‘ + token
},
body: JSON.stringify({q})
});
if(!r.ok){ out.textContent = ‘Error. Try again.’; return; }
const data = await r.json();
const cites = (data.citations||[]).map(c=>`- ${c.title}${c.url? ‘ (‘+c.url+’)’:”}`).join(‘n’);
out.innerHTML = `

${(data.answer||”).replace(/n/g,’
‘)}
${cites}

`;
} catch(e){ out.textContent = ‘Network error.’; }
});
})();

RAG Client


<input type="url" name="rag_client_endpoint" value="” style=”width:420px;” required/>

Example: https://api.example.com/api/rag/query

site_url(), ‘iat’=> time(), ‘exp’=> time()+60];
$header = [‘alg’=>’HS256′,’typ’=>’JWT’];
$seg = function($x){ return rtrim(strtr(base64_encode(json_encode($x)), ‘+/’, ‘-_’), ‘=’); };
$signing_input = $seg($header).’.’.$seg($payload);
$sig = hash_hmac(‘sha256’, $signing_input, get_option(‘rag_client_local_hmac’,’local-demo’), true);
$token = $signing_input.’.’.rtrim(strtr(base64_encode($sig), ‘+/’, ‘-_’), ‘=’);
header(‘Content-Type: text/plain’);
echo $token;
wp_die();
}

Security note:
– Do not hardcode secrets in JS.
– Store local HMAC key in WP options or wp-config.php:
define(‘RAG_CLIENT_LOCAL_HMAC’, ‘long-random’);
update_option(‘rag_client_local_hmac’, RAG_CLIENT_LOCAL_HMAC);

Usage in posts/pages
[ragexample]
[ragexample] is not used. Use:
[rag_ask]

13) Hardening and ops
– HTTPS end-to-end. Block Django endpoint to only accept requests from your WP origin(s) via firewall or middleware.
– Add CORS: allow only your domain.
– Set timeouts: embedding 10s, completion 20–30s.
– Retries: exponential backoff (max 2) on 429/5xx.
– Observability: log query latency, retrieval hits, tokens used, cache hit rate.
– Data retention: do not log raw user questions if sensitive.
– Backups: nightly Postgres base + WAL archiving.

14) Quick ingestion example
Create a management command:
python manage.py startapp commands (or use rag/management/commands)

rag/management/commands/ingest_demo.py
from django.core.management.base import BaseCommand
from rag.ingest import ingest_document

SAMPLE = “””Your docs or policy text here…”””

class Command(BaseCommand):
def handle(self, *args, **kwargs):
ingest_document(“sample-001”, “Sample Docs”, “https://example.com/docs”, SAMPLE)
self.stdout.write(self.style.SUCCESS(“Ingested”))

Run:
python manage.py ingest_demo

15) Simple load test
– Insert 10–50 docs, 200–1,000 chunks.
– Run hey:
hey -n 200 -c 10 -m POST -H “Authorization: Bearer ” -D body.json https://api.example.com/api/rag/query

body.json:
{“q”:”What is covered by our policy?”}

Expect p95 < 2.5s with warmed caches and IVFFLAT.

16) Cost and performance tips
– Use text-embedding-3-small if acceptable; reduce dim to lower memory and faster ANN.
– Pre-filter by metadata (doc type, section).
– Cache answers. Deduplicate contexts.
– Tune IVFFLAT lists (64–256) and probes (SET LOCAL ivfflat.probes = 5–20).

That’s it. You now have a deployable RAG API with a clean WordPress integration.