At W3cert, we’re building AI-powered SaaS applications. Our stack splits the workload: Python handles the AI/ML logic, and Go handles the high-throughput microservices. Here’s how we architect it and the lessons we’ve learned.
Why Two Languages?
Python is unbeatable for AI/ML — the ecosystem (LangChain, Hugging Face, scikit-learn, pandas) is massive. But Python is slow for concurrent HTTP services. Go gives us the performance we need for APIs that handle thousands of requests per second.
The split is clean:
| Concern | Language | Why |
|---|---|---|
| LLM integration | Python | LangChain, OpenAI SDK |
| RAG pipelines | Python | Vector DBs, embeddings |
| Data processing | Python | pandas, numpy |
| API gateway | Go | Performance, concurrency |
| Real-time services | Go | goroutines, low latency |
| Background jobs | Go | Worker pools, scheduling |
RAG Pipeline Architecture
Our Retrieval-Augmented Generation pipeline processes documents and answers questions using company data:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
class DocumentProcessor:
def __init__(self):
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " ", ""],
)
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-small"
)
self.vectorstore = Chroma(
persist_directory="./chroma_db",
embedding_function=self.embeddings,
)
def ingest(self, documents: list[str], metadata: list[dict]):
chunks = []
chunk_metadata = []
for doc, meta in zip(documents, metadata):
splits = self.splitter.split_text(doc)
chunks.extend(splits)
chunk_metadata.extend([meta] * len(splits))
self.vectorstore.add_texts(
texts=chunks,
metadatas=chunk_metadata,
)
def query(self, question: str, k: int = 5) -> list:
results = self.vectorstore.similarity_search_with_score(
question, k=k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"score": float(score),
}
for doc, score in results
]
Go API Gateway
The Go service acts as the entry point for all client requests. It validates input, enforces rate limits, and routes to the appropriate backend:
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/httprate"
)
type QueryRequest struct {
Question string `json:"question" validate:"required,min=3"`
SessionID string `json:"session_id" validate:"required,uuid"`
}
type QueryResponse struct {
Answer string `json:"answer"`
Sources []Source `json:"sources"`
Latency float64 `json:"latency_ms"`
}
func main() {
r := chi.NewRouter()
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Use(middleware.Timeout(30 * time.Second))
r.Use(httprate.LimitByIP(100, 1*time.Minute))
r.Post("/api/v1/query", handleQuery)
r.Get("/api/v1/health", handleHealth)
log.Println("Starting server on :8080")
http.ListenAndServe(":8080", r)
}
func handleQuery(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var req QueryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// Call Python RAG service
ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
defer cancel()
answer, sources, err := callRAGService(ctx, req.Question, req.SessionID)
if err != nil {
http.Error(w, "Failed to process query", http.StatusInternalServerError)
return
}
resp := QueryResponse{
Answer: answer,
Sources: sources,
Latency: float64(time.Since(start).Milliseconds()),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
Communication Between Services
The Go gateway communicates with the Python service via gRPC for low-latency calls and Redis pub/sub for async jobs:
syntax = "proto3";
package ai;
service RAGService {
rpc Query (QueryRequest) returns (QueryResponse);
rpc IngestDocument (IngestRequest) returns (IngestResponse);
}
message QueryRequest {
string question = 1;
string session_id = 2;
int32 max_results = 3;
}
message QueryResponse {
string answer = 1;
repeated Source sources = 2;
float confidence = 3;
}
gRPC gives us:
- Type-safe contracts between services
- Streaming for large responses
- ~10x faster than REST for inter-service communication
- Auto-generated client code in both Go and Python
Cost Optimization
LLM API calls are expensive. Here’s how we reduced costs by 60%:
- Caching — Cache identical queries in Redis with a 1-hour TTL
- Smaller models for routing — Use GPT-3.5 to classify the query type, only use GPT-4 for complex reasoning
- Chunking strategy — Smaller, more precise chunks mean fewer tokens in the context window
- Embedding caching — Don’t re-embed documents that haven’t changed
import hashlib
import redis
class CachedRAG:
def __init__(self, rag_service, redis_client: redis.Redis):
self.rag = rag_service
self.cache = redis_client
def query(self, question: str) -> dict:
cache_key = f"rag:{hashlib.sha256(question.encode()).hexdigest()}"
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
result = self.rag.query(question)
self.cache.setex(cache_key, 3600, json.dumps(result))
return result
Deployment
Both services run in Docker containers orchestrated with Docker Compose in staging and Kubernetes in production:
services:
gateway:
build: ./gateway
ports:
- "8080:8080"
environment:
- RAG_SERVICE_URL=rag:50051
- REDIS_URL=redis://redis:6379
rag:
build: ./rag-service
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- CHROMA_PATH=/data/chroma
volumes:
- chroma_data:/data/chroma
redis:
image: redis:alpine
This architecture has scaled well for us. The Go gateway handles 5000+ req/s on a single instance, and the Python RAG service processes queries in under 2 seconds including the LLM call.