Skip to content

DGM Adapters

A DGM (Data Gateway Manager) plugin normalizes external data sources into the platform's canonical schema. Each DGM instance corresponds to one external data provider (e.g., Bloomberg, Polygon, Databento, ICE). The DGM translates vendor-specific formats into kernel-canonical events.


Three-Stage Pipeline

Every DGM plugin implements a three-stage pipeline. The sidecar orchestrates the stages; the plugin must not call them directly.

Stage 1: transform_query

Translates a kernel subscription request into a vendor-specific query. Maps kernel instrument identifiers to the provider's symbology.

def transform_query(self, subscription_request):
    vendor_symbol = self._resolve_symbol(subscription_request.canonical_id)
    return NormalizedQuery(
        vendor_symbol=vendor_symbol,
        fields=["PX_LAST", "PX_OPEN", "PX_HIGH", "PX_LOW", "VOLUME"],
        start_date=subscription_request.start,
        end_date=subscription_request.end,
    )

Stage 2: extract_data

Fetches data from the external provider. Credentials are injected by the sidecar for this call only. The plugin must not cache credentials beyond a single invocation. All network egress passes through the sidecar's Network Egress Proxy.

def extract_data(self, normalized_query, credentials):
    client = VendorAPI(api_key=credentials["api_key"])
    raw = client.fetch(
        symbol=normalized_query.vendor_symbol,
        fields=normalized_query.fields,
        start=normalized_query.start_date,
        end=normalized_query.end_date,
    )
    return raw  # opaque bytes

Stage 3: transform_data

Transforms vendor-specific data into kernel-canonical events. Every event must include ts_event and ts_init timestamps.

def transform_data(self, normalized_query, raw_response):
    rows = self._parse_vendor_format(raw_response)
    events = []
    for row in rows:
        events.append(NormalizedEvent(
            canonical_id=normalized_query.canonical_id,
            ts_event=row["timestamp"],
            ts_init=datetime.utcnow(),
            fields={
                "open": row["PX_OPEN"],
                "high": row["PX_HIGH"],
                "low": row["PX_LOW"],
                "close": row["PX_LAST"],
                "volume": row["VOLUME"],
            },
        ))
    return events

Streaming Hooks

DGM plugins that support real-time streaming implement these additional methods:

on_subscribe

Initiates a streaming subscription at the specified resolution.

def on_subscribe(self, symbol, resolution):
    vendor_symbol = self._resolve_symbol(symbol)
    self.feed.subscribe(vendor_symbol, resolution)

on_unsubscribe

Terminates a streaming subscription.

def on_unsubscribe(self, symbol):
    vendor_symbol = self._resolve_symbol(symbol)
    self.feed.unsubscribe(vendor_symbol)

on_tick

Normalizes a single raw tick from the vendor feed. Must include ts_event and ts_init.

def on_tick(self, raw_tick):
    return NormalizedTick(
        canonical_id=self._resolve_symbol(raw_tick.symbol),
        ts_event=raw_tick.exchange_timestamp,
        ts_init=datetime.utcnow(),
        price=raw_tick.last_price,
        size=raw_tick.last_size,
        conditions=raw_tick.conditions,
    )

Subscription management is handled by the sidecar. The plugin must not maintain its own subscription registry. A DGM plugin may support batch only, streaming only, or both (declared in the capability manifest).


Feed Status

DGM plugins must publish feed status updates to platform.data.feed.status whenever connection state changes.

self.publish_feed_status(
    feed_id=self.instance_id,
    status="CONNECTED",          # CONNECTED | DISCONNECTED | DEGRADED | RECONNECTING
    symbols_count=len(self.active_subscriptions),
)

On DEGRADED or DISCONNECTED, the plugin must also invoke on_degrade(reason) on the base lifecycle.


Capabilities Declaration

Every DGM plugin declares its capabilities at registration:

class MyVendorDGM(DGMPlugin):
    vendor_name = "polygon"
    supported_data_types = ["time_series", "snapshot"]
    supported_asset_classes = ["EQUITY", "FX", "CRYPTO"]
    supported_identifiers = ["TICKER", "FIGI"]
    supports_streaming = True
    supports_historical = True
    min_resolution = "tick"
    max_resolution = "1M"
Capability Description
vendor_name Unique vendor identifier (e.g., "bloomberg", "polygon", "databento")
supported_data_types Data shapes produced: time_series, snapshot, matrix, tabular, document, image, audio, geospatial, graph, event_log
supported_asset_classes Asset classes covered: EQUITY, FIXED_INCOME, FX, DERIVATIVE, COMMODITY, CRYPTO
supported_identifiers External identifier types the plugin can resolve: TICKER, FIGI, ISIN, CUSIP, SEDOL, RIC, BLOOMBERG_ID
supports_streaming Whether the plugin supports real-time data
supports_historical Whether the plugin supports historical/batch queries

Field Mapping

Each DGM plugin provides a field map declaring how vendor-native field names map to canonical field names. The sidecar validates the field map at registration.

field_map = {
    "PX_LAST": "close",
    "PX_OPEN": "open",
    "PX_HIGH": "high",
    "PX_LOW": "low",
    "VOLUME": "volume",
    "PX_BID": "bid",
    "PX_ASK": "ask",
    "CUR_MKT_CAP": "market_cap",
}

Every published data point carries FieldSource metadata identifying the vendor, vendor field name, and vendor timestamp. This enables the vendor priority waterfall and audit trail.


Credential Injection

Credentials are injected by the sidecar into the extract_data call only. The plugin must not:

  • Store credentials in memory beyond a single extract_data invocation
  • Log credentials at any level
  • Transmit credentials to any endpoint other than the vendor API (via the egress proxy)

Full Example

from meridian.sdk import DGMPlugin, NormalizedQuery, NormalizedEvent

class PolygonDGM(DGMPlugin):
    vendor_name = "polygon"
    supported_data_types = ["time_series"]
    supported_asset_classes = ["EQUITY"]
    supports_streaming = True
    supports_historical = True

    def transform_query(self, subscription_request):
        return NormalizedQuery(
            vendor_symbol=subscription_request.symbol,
            fields=["o", "h", "l", "c", "v"],
            start_date=subscription_request.start,
            end_date=subscription_request.end,
        )

    def extract_data(self, query, credentials):
        import requests
        resp = requests.get(
            f"https://api.polygon.io/v2/aggs/ticker/{query.vendor_symbol}/range/1/day/{query.start_date}/{query.end_date}",
            headers={"Authorization": f"Bearer {credentials['api_key']}"},
        )
        return resp.content

    def transform_data(self, query, raw_response):
        import json
        data = json.loads(raw_response)
        events = []
        for bar in data.get("results", []):
            events.append(NormalizedEvent(
                canonical_id=query.canonical_id,
                ts_event=bar["t"],
                ts_init=int(datetime.utcnow().timestamp() * 1e9),
                fields={"open": bar["o"], "high": bar["h"], "low": bar["l"], "close": bar["c"], "volume": bar["v"]},
            ))
        return events

    def on_subscribe(self, symbol, resolution):
        self.ws.subscribe(f"A.{symbol}")

    def on_unsubscribe(self, symbol):
        self.ws.unsubscribe(f"A.{symbol}")

    def on_tick(self, raw_tick):
        return NormalizedTick(
            canonical_id=raw_tick["sym"],
            ts_event=raw_tick["t"],
            ts_init=int(datetime.utcnow().timestamp() * 1e9),
            price=raw_tick["p"],
            size=raw_tick["s"],
        )

Bus Topics

Topic Direction Description
platform.dgm.{id}.data.price.eod Publish End-of-day price events
platform.dgm.{id}.data.price.tick Publish Real-time tick events
platform.dgm.{id}.data.reference.instrument Publish Instrument reference data
platform.dgm.{id}.data.alt.{subtype} Publish Alternative data events
platform.data.feed.status Publish Feed status changes