← back to blog
2026-03-28 ai

Building AI Applications with Python and Go Microservices

At W3cert, we’re building AI-powered SaaS applications. Our stack splits the workload: Python handles the AI/ML logic, and Go handles the high-throughput microservices. Here’s how we architect it and the lessons we’ve learned.

Why Two Languages?

Python is unbeatable for AI/ML — the ecosystem (LangChain, Hugging Face, scikit-learn, pandas) is massive. But Python is slow for concurrent HTTP services. Go gives us the performance we need for APIs that handle thousands of requests per second.

The split is clean:

ConcernLanguageWhy
LLM integrationPythonLangChain, OpenAI SDK
RAG pipelinesPythonVector DBs, embeddings
Data processingPythonpandas, numpy
API gatewayGoPerformance, concurrency
Real-time servicesGogoroutines, low latency
Background jobsGoWorker pools, scheduling

RAG Pipeline Architecture

Our Retrieval-Augmented Generation pipeline processes documents and answers questions using company data:

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

class DocumentProcessor:
    def __init__(self):
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", ". ", " ", ""],
        )
        self.embeddings = OpenAIEmbeddings(
            model="text-embedding-3-small"
        )
        self.vectorstore = Chroma(
            persist_directory="./chroma_db",
            embedding_function=self.embeddings,
        )

    def ingest(self, documents: list[str], metadata: list[dict]):
        chunks = []
        chunk_metadata = []

        for doc, meta in zip(documents, metadata):
            splits = self.splitter.split_text(doc)
            chunks.extend(splits)
            chunk_metadata.extend([meta] * len(splits))

        self.vectorstore.add_texts(
            texts=chunks,
            metadatas=chunk_metadata,
        )

    def query(self, question: str, k: int = 5) -> list:
        results = self.vectorstore.similarity_search_with_score(
            question, k=k
        )
        return [
            {
                "content": doc.page_content,
                "metadata": doc.metadata,
                "score": float(score),
            }
            for doc, score in results
        ]

Go API Gateway

The Go service acts as the entry point for all client requests. It validates input, enforces rate limits, and routes to the appropriate backend:

package main

import (
    "context"
    "encoding/json"
    "log"
    "net/http"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
    "github.com/go-chi/httprate"
)

type QueryRequest struct {
    Question  string `json:"question" validate:"required,min=3"`
    SessionID string `json:"session_id" validate:"required,uuid"`
}

type QueryResponse struct {
    Answer  string   `json:"answer"`
    Sources []Source `json:"sources"`
    Latency float64  `json:"latency_ms"`
}

func main() {
    r := chi.NewRouter()

    r.Use(middleware.Logger)
    r.Use(middleware.Recoverer)
    r.Use(middleware.Timeout(30 * time.Second))
    r.Use(httprate.LimitByIP(100, 1*time.Minute))

    r.Post("/api/v1/query", handleQuery)
    r.Get("/api/v1/health", handleHealth)

    log.Println("Starting server on :8080")
    http.ListenAndServe(":8080", r)
}

func handleQuery(w http.ResponseWriter, r *http.Request) {
    start := time.Now()

    var req QueryRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "Invalid request body", http.StatusBadRequest)
        return
    }

    // Call Python RAG service
    ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second)
    defer cancel()

    answer, sources, err := callRAGService(ctx, req.Question, req.SessionID)
    if err != nil {
        http.Error(w, "Failed to process query", http.StatusInternalServerError)
        return
    }

    resp := QueryResponse{
        Answer:  answer,
        Sources: sources,
        Latency: float64(time.Since(start).Milliseconds()),
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(resp)
}

Communication Between Services

The Go gateway communicates with the Python service via gRPC for low-latency calls and Redis pub/sub for async jobs:

syntax = "proto3";

package ai;

service RAGService {
  rpc Query (QueryRequest) returns (QueryResponse);
  rpc IngestDocument (IngestRequest) returns (IngestResponse);
}

message QueryRequest {
  string question = 1;
  string session_id = 2;
  int32 max_results = 3;
}

message QueryResponse {
  string answer = 1;
  repeated Source sources = 2;
  float confidence = 3;
}

gRPC gives us:

  • Type-safe contracts between services
  • Streaming for large responses
  • ~10x faster than REST for inter-service communication
  • Auto-generated client code in both Go and Python

Cost Optimization

LLM API calls are expensive. Here’s how we reduced costs by 60%:

  1. Caching — Cache identical queries in Redis with a 1-hour TTL
  2. Smaller models for routing — Use GPT-3.5 to classify the query type, only use GPT-4 for complex reasoning
  3. Chunking strategy — Smaller, more precise chunks mean fewer tokens in the context window
  4. Embedding caching — Don’t re-embed documents that haven’t changed
import hashlib
import redis

class CachedRAG:
    def __init__(self, rag_service, redis_client: redis.Redis):
        self.rag = rag_service
        self.cache = redis_client

    def query(self, question: str) -> dict:
        cache_key = f"rag:{hashlib.sha256(question.encode()).hexdigest()}"

        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)

        result = self.rag.query(question)
        self.cache.setex(cache_key, 3600, json.dumps(result))
        return result

Deployment

Both services run in Docker containers orchestrated with Docker Compose in staging and Kubernetes in production:

services:
  gateway:
    build: ./gateway
    ports:
      - "8080:8080"
    environment:
      - RAG_SERVICE_URL=rag:50051
      - REDIS_URL=redis://redis:6379

  rag:
    build: ./rag-service
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - CHROMA_PATH=/data/chroma
    volumes:
      - chroma_data:/data/chroma

  redis:
    image: redis:alpine

This architecture has scaled well for us. The Go gateway handles 5000+ req/s on a single instance, and the Python RAG service processes queries in under 2 seconds including the LLM call.