DGM Adapters¶
A DGM (Data Gateway Manager) plugin normalizes external data sources into the platform's canonical schema. Each DGM instance corresponds to one external data provider (e.g., Bloomberg, Polygon, Databento, ICE). The DGM translates vendor-specific formats into kernel-canonical events.
Three-Stage Pipeline¶
Every DGM plugin implements a three-stage pipeline. The sidecar orchestrates the stages; the plugin must not call them directly.
Stage 1: transform_query¶
Translates a kernel subscription request into a vendor-specific query. Maps kernel instrument identifiers to the provider's symbology.
def transform_query(self, subscription_request):
vendor_symbol = self._resolve_symbol(subscription_request.canonical_id)
return NormalizedQuery(
vendor_symbol=vendor_symbol,
fields=["PX_LAST", "PX_OPEN", "PX_HIGH", "PX_LOW", "VOLUME"],
start_date=subscription_request.start,
end_date=subscription_request.end,
)
Stage 2: extract_data¶
Fetches data from the external provider. Credentials are injected by the sidecar for this call only. The plugin must not cache credentials beyond a single invocation. All network egress passes through the sidecar's Network Egress Proxy.
def extract_data(self, normalized_query, credentials):
client = VendorAPI(api_key=credentials["api_key"])
raw = client.fetch(
symbol=normalized_query.vendor_symbol,
fields=normalized_query.fields,
start=normalized_query.start_date,
end=normalized_query.end_date,
)
return raw # opaque bytes
Stage 3: transform_data¶
Transforms vendor-specific data into kernel-canonical events. Every event must include ts_event and ts_init timestamps.
def transform_data(self, normalized_query, raw_response):
rows = self._parse_vendor_format(raw_response)
events = []
for row in rows:
events.append(NormalizedEvent(
canonical_id=normalized_query.canonical_id,
ts_event=row["timestamp"],
ts_init=datetime.utcnow(),
fields={
"open": row["PX_OPEN"],
"high": row["PX_HIGH"],
"low": row["PX_LOW"],
"close": row["PX_LAST"],
"volume": row["VOLUME"],
},
))
return events
Streaming Hooks¶
DGM plugins that support real-time streaming implement these additional methods:
on_subscribe¶
Initiates a streaming subscription at the specified resolution.
def on_subscribe(self, symbol, resolution):
vendor_symbol = self._resolve_symbol(symbol)
self.feed.subscribe(vendor_symbol, resolution)
on_unsubscribe¶
Terminates a streaming subscription.
def on_unsubscribe(self, symbol):
vendor_symbol = self._resolve_symbol(symbol)
self.feed.unsubscribe(vendor_symbol)
on_tick¶
Normalizes a single raw tick from the vendor feed. Must include ts_event and ts_init.
def on_tick(self, raw_tick):
return NormalizedTick(
canonical_id=self._resolve_symbol(raw_tick.symbol),
ts_event=raw_tick.exchange_timestamp,
ts_init=datetime.utcnow(),
price=raw_tick.last_price,
size=raw_tick.last_size,
conditions=raw_tick.conditions,
)
Subscription management is handled by the sidecar. The plugin must not maintain its own subscription registry. A DGM plugin may support batch only, streaming only, or both (declared in the capability manifest).
Feed Status¶
DGM plugins must publish feed status updates to platform.data.feed.status whenever connection state changes.
self.publish_feed_status(
feed_id=self.instance_id,
status="CONNECTED", # CONNECTED | DISCONNECTED | DEGRADED | RECONNECTING
symbols_count=len(self.active_subscriptions),
)
On DEGRADED or DISCONNECTED, the plugin must also invoke on_degrade(reason) on the base lifecycle.
Capabilities Declaration¶
Every DGM plugin declares its capabilities at registration:
class MyVendorDGM(DGMPlugin):
vendor_name = "polygon"
supported_data_types = ["time_series", "snapshot"]
supported_asset_classes = ["EQUITY", "FX", "CRYPTO"]
supported_identifiers = ["TICKER", "FIGI"]
supports_streaming = True
supports_historical = True
min_resolution = "tick"
max_resolution = "1M"
| Capability | Description |
|---|---|
vendor_name |
Unique vendor identifier (e.g., "bloomberg", "polygon", "databento") |
supported_data_types |
Data shapes produced: time_series, snapshot, matrix, tabular, document, image, audio, geospatial, graph, event_log |
supported_asset_classes |
Asset classes covered: EQUITY, FIXED_INCOME, FX, DERIVATIVE, COMMODITY, CRYPTO |
supported_identifiers |
External identifier types the plugin can resolve: TICKER, FIGI, ISIN, CUSIP, SEDOL, RIC, BLOOMBERG_ID |
supports_streaming |
Whether the plugin supports real-time data |
supports_historical |
Whether the plugin supports historical/batch queries |
Field Mapping¶
Each DGM plugin provides a field map declaring how vendor-native field names map to canonical field names. The sidecar validates the field map at registration.
field_map = {
"PX_LAST": "close",
"PX_OPEN": "open",
"PX_HIGH": "high",
"PX_LOW": "low",
"VOLUME": "volume",
"PX_BID": "bid",
"PX_ASK": "ask",
"CUR_MKT_CAP": "market_cap",
}
Every published data point carries FieldSource metadata identifying the vendor, vendor field name, and vendor timestamp. This enables the vendor priority waterfall and audit trail.
Credential Injection¶
Credentials are injected by the sidecar into the extract_data call only. The plugin must not:
- Store credentials in memory beyond a single
extract_datainvocation - Log credentials at any level
- Transmit credentials to any endpoint other than the vendor API (via the egress proxy)
Full Example¶
from meridian.sdk import DGMPlugin, NormalizedQuery, NormalizedEvent
class PolygonDGM(DGMPlugin):
vendor_name = "polygon"
supported_data_types = ["time_series"]
supported_asset_classes = ["EQUITY"]
supports_streaming = True
supports_historical = True
def transform_query(self, subscription_request):
return NormalizedQuery(
vendor_symbol=subscription_request.symbol,
fields=["o", "h", "l", "c", "v"],
start_date=subscription_request.start,
end_date=subscription_request.end,
)
def extract_data(self, query, credentials):
import requests
resp = requests.get(
f"https://api.polygon.io/v2/aggs/ticker/{query.vendor_symbol}/range/1/day/{query.start_date}/{query.end_date}",
headers={"Authorization": f"Bearer {credentials['api_key']}"},
)
return resp.content
def transform_data(self, query, raw_response):
import json
data = json.loads(raw_response)
events = []
for bar in data.get("results", []):
events.append(NormalizedEvent(
canonical_id=query.canonical_id,
ts_event=bar["t"],
ts_init=int(datetime.utcnow().timestamp() * 1e9),
fields={"open": bar["o"], "high": bar["h"], "low": bar["l"], "close": bar["c"], "volume": bar["v"]},
))
return events
def on_subscribe(self, symbol, resolution):
self.ws.subscribe(f"A.{symbol}")
def on_unsubscribe(self, symbol):
self.ws.unsubscribe(f"A.{symbol}")
def on_tick(self, raw_tick):
return NormalizedTick(
canonical_id=raw_tick["sym"],
ts_event=raw_tick["t"],
ts_init=int(datetime.utcnow().timestamp() * 1e9),
price=raw_tick["p"],
size=raw_tick["s"],
)
Bus Topics¶
| Topic | Direction | Description |
|---|---|---|
platform.dgm.{id}.data.price.eod |
Publish | End-of-day price events |
platform.dgm.{id}.data.price.tick |
Publish | Real-time tick events |
platform.dgm.{id}.data.reference.instrument |
Publish | Instrument reference data |
platform.dgm.{id}.data.alt.{subtype} |
Publish | Alternative data events |
platform.data.feed.status |
Publish | Feed status changes |