Pipeline Architecture

End-to-end data pipeline from API to dashboard

System Overview

Click any node to view details about its role in the pipeline. Data flows left to right from the Alpha Vantage API through ingestion, transformation, and analytics layers to the interactive dashboard.

JSON responsesRaw OHLCV dataStaged dataAnalytics tablesSQL queriesREST APIAlpha VantageREST APIPython IngestionPython / RequestsPostgreSQLPostgreSQL 16Python TransformPython / PandasClickHouseClickHouse 24FastAPIPython / FastAPINext.jsNext.js 14 / React

How Data Flows

1

Data Ingestion

Python script fetches daily OHLCV data from Alpha Vantage for tracked symbols.

# Fetch daily stock data
response = requests.get(
    "https://alphavantage.co/query",
    params={"function": "TIME_SERIES_DAILY", "symbol": symbol}
)
daily_data = response.json()["Time Series (Daily)"]
2

PostgreSQL Staging

Raw data is inserted into PostgreSQL as the OLTP staging layer with deduplication.

-- Upsert raw stock prices
INSERT INTO stock_prices (symbol, trade_date, open, high, low, close, volume)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (symbol, trade_date) DO UPDATE
SET open = EXCLUDED.open, close = EXCLUDED.close;
3

Analytics Transform

Python calculates technical indicators: moving averages, MACD, RSI, and volume analysis.

# Calculate technical indicators with pandas
df["sma_20"] = df["close"].rolling(window=20).mean()
df["ema_12"] = df["close"].ewm(span=12).mean()
df["macd"] = df["ema_12"] - df["ema_26"]
df["rsi_14"] = compute_rsi(df["close"], period=14)
4

ClickHouse OLAP

Transformed analytics data is loaded into ClickHouse for fast columnar queries.

-- ClickHouse MergeTree table for fast analytics
CREATE TABLE stock_analytics (
    symbol LowCardinality(String),
    trade_date Date,
    sma_20 Float64,
    rsi_14 Float64
) ENGINE = MergeTree()
ORDER BY (symbol, trade_date);
5

FastAPI Backend

REST API layer queries ClickHouse and serves data to the frontend with caching.

@app.get("/api/pipeline/stocks/{symbol}/analytics")
async def get_analytics(symbol: str, days: int = 90):
    query = """
        SELECT * FROM stock_analytics
        WHERE symbol = {symbol:String}
        ORDER BY trade_date DESC LIMIT {days:UInt32}
    """
    return await clickhouse.execute(query)
6

Next.js Dashboard

Interactive React dashboard renders charts with Chart.js and real-time data updates.

// Fetch and render stock analytics
const [ohlcv, analytics, volume] = await Promise.all([
  getStockOHLCV(symbol, days),
  getStockAnalytics(symbol, days),
  getStockVolume(symbol, days),
]);
return <PriceChart data={ohlcv} analytics={analytics} />;

Why This Architecture

OLTP: PostgreSQL

PostgreSQL handles transactional workloads -- ingesting and deduplicating raw stock data with row-level ACID guarantees. Its upsert capability ensures data integrity during incremental loads, while serving as the pipeline metadata store for run tracking.

OLAP: ClickHouse

ClickHouse is a columnar database optimized for analytical queries. Scanning millions of rows for time-series aggregations, moving averages, and technical indicators runs 10-100x faster than PostgreSQL. The MergeTree engine provides efficient compression and partition pruning for date-range queries.

Separation of Concerns

Keeping OLTP and OLAP databases separate prevents analytical queries from impacting ingestion performance. Each database is tuned for its workload -- PostgreSQL for writes and ClickHouse for reads. The Python transform layer bridges the two, computing derived metrics before loading into the analytics store.

Pipeline Observability

Every pipeline run is tracked with row counts, timing, and error messages. The health endpoint monitors both database connections and data freshness. The dashboard surfaces these metrics so operators can detect stale data or failed runs immediately without checking logs.