Market Data Infrastructure
Real-Time Market Data Infrastructure¶
Comprehensive guide to Neutryx's real-time market data feeds, database connectors, and data validation framework.
Table of Contents¶
- Overview
- Quick Start
- Market Data Adapters
- Database Storage
- Data Validation
- Real-Time Feeds
- Examples
- Production Deployment
Overview¶
Neutryx provides enterprise-grade market data infrastructure with:
- Bloomberg & Refinitiv Integration: Native support for major data vendors
- Multi-Database Storage: PostgreSQL, MongoDB, and TimescaleDB connectors
- Data Validation: Comprehensive quality checks and anomaly detection
- Real-Time Feeds: Streaming data with automatic failover
- Production-Ready: Built for high-throughput, low-latency applications
Quick Start¶
Install Dependencies¶
# Core dependencies (already in requirements.txt)
pip install asyncpg motor
# Optional: Bloomberg API
pip install blpapi
# Optional: Refinitiv API
pip install refinitiv-dataplatform eikon
Basic Usage¶
from neutryx.market.adapters import BloombergAdapter, BloombergConfig
from neutryx.market.storage import TimescaleDBStorage, TimescaleDBConfig
from neutryx.market.validation import ValidationPipeline, PriceRangeValidator
from neutryx.market.feeds import FeedManager
# Configure Bloomberg
bloomberg_config = BloombergConfig(
adapter_name="bloomberg",
host="localhost",
port=8194
)
# Configure TimescaleDB
storage_config = TimescaleDBConfig(
host="localhost",
port=5432,
database="market_data",
username="trader",
password="secret",
compression_enabled=True
)
# Create components
adapter = BloombergAdapter(bloomberg_config)
storage = TimescaleDBStorage(storage_config)
# Setup validation
pipeline = ValidationPipeline()
pipeline.add_validator(PriceRangeValidator(min_price=0, max_price=10000))
# Create feed manager
manager = FeedManager(
adapters=[adapter],
storage=storage,
validation_pipeline=pipeline
)
# Start and subscribe
await manager.start()
await manager.subscribe("equity", ["AAPL", "MSFT", "GOOGL"])
Market Data Adapters¶
Bloomberg Adapter¶
Connect to Bloomberg Terminal or Bloomberg Server API (SAPI/BPIPE).
from neutryx.market.adapters import BloombergAdapter, BloombergConfig
config = BloombergConfig(
adapter_name="bloomberg",
host="localhost", # Bloomberg API host
port=8194, # Bloomberg API port
application_name="Neutryx",
timeout_ms=5000,
cache_enabled=True
)
adapter = BloombergAdapter(config)
await asyncio.to_thread(adapter.connect)
# Get equity quote
quote = adapter.get_equity_quote("AAPL US Equity")
print(f"Price: {quote.price}, Volume: {quote.volume}")
# Get FX quote
fx_quote = adapter.get_fx_quote("EUR", "USD")
print(f"EUR/USD: {fx_quote.spot}")
# Get bond quote
bond_quote = adapter.get_bond_quote("US912828ZG94", id_type="isin")
print(f"Yield: {bond_quote.yield_to_maturity:.2%}")
Bloomberg Field Reference:
- PX_LAST: Last price
- PX_BID: Bid price
- PX_ASK: Ask price
- PX_VOLUME: Volume
- YLD_YTM_MID: Yield to maturity
- Z_SPRD_MID: Credit spread
Refinitiv Adapter¶
Connect to Refinitiv Data Platform (RDP) or Eikon Desktop.
from neutryx.market.adapters import RefinitivAdapter, RefinitivConfig
# For Eikon Desktop
config = RefinitivConfig(
adapter_name="refinitiv",
app_key="your_app_key",
use_desktop=True
)
# For Refinitiv Data Platform
config_rdp = RefinitivConfig(
adapter_name="refinitiv",
app_key="your_app_key",
username="your_username",
password="your_password",
use_desktop=False
)
adapter = RefinitivAdapter(config)
adapter.connect()
# Get equity quote (using RIC)
quote = adapter.get_equity_quote("AAPL.O") # Apple on NASDAQ
# Get FX quote
fx_quote = adapter.get_fx_quote("EUR", "USD") # EUR=
# Get commodity quote
commodity = adapter.get_commodity_quote("LCOc1") # Brent crude
Reuters Instrument Code (RIC) Examples:
- Equity: AAPL.O (Apple on NASDAQ), MSFT.O
- FX: EUR= (EUR/USD), JPY= (USD/JPY)
- Commodities: LCOc1 (Brent crude), CLc1 (WTI crude)
- Bonds: US10YT=RR (US 10Y Treasury yield)
Database Storage¶
PostgreSQL Storage¶
High-performance relational storage with time-series optimizations.
from neutryx.market.storage import PostgreSQLStorage, PostgreSQLConfig
config = PostgreSQLConfig(
host="localhost",
port=5432,
database="market_data",
username="trader",
password="secret",
schema="market_data",
table_prefix="md_",
connection_pool_size=20
)
storage = PostgreSQLStorage(config)
await storage.connect()
# Create indexes
await storage.create_indexes()
# Store quote
await storage.store_quote(
data_type="equity",
symbol="AAPL",
timestamp=datetime.utcnow(),
data={
"price": 150.25,
"bid": 150.20,
"ask": 150.30,
"volume": 1000000
}
)
# Store batch
records = [
{
"symbol": "AAPL",
"timestamp": datetime.utcnow(),
"price": 150.25,
"volume": 1000000
},
# ... more records
]
await storage.store_batch("equity", records)
# Query latest
latest = await storage.query_latest("equity", "AAPL")
print(f"Latest price: {latest['price']}")
# Query time range
data = await storage.query_time_range(
"equity",
"AAPL",
start_time=datetime(2025, 1, 1),
end_time=datetime(2025, 1, 31),
limit=1000
)
# Query aggregated (OHLCV)
ohlcv = await storage.query_aggregated(
"equity",
"AAPL",
start_time=datetime(2025, 1, 1),
end_time=datetime(2025, 1, 31),
interval="1min"
)
MongoDB Storage¶
Flexible document storage for heterogeneous market data.
from neutryx.market.storage import MongoDBStorage, MongoDBConfig
config = MongoDBConfig(
host="localhost",
port=27017,
database="market_data",
username="trader",
password="secret",
collection_prefix="md_",
use_time_series=True # MongoDB 5.0+ time-series collections
)
storage = MongoDBStorage(config)
await storage.connect()
# Store with flexible schema
await storage.store_quote(
data_type="equity",
symbol="AAPL",
timestamp=datetime.utcnow(),
data={
"price": 150.25,
"volume": 1000000,
"custom_field": "value", # Flexible schema
"metadata": {"exchange": "NASDAQ"}
}
)
# Aggregation pipeline
aggregated = await storage.query_aggregated(
"equity",
"AAPL",
start_time=datetime(2025, 1, 1),
end_time=datetime(2025, 1, 31),
interval="1min"
)
TimescaleDB Storage¶
Optimized time-series database with automatic partitioning and compression.
from neutryx.market.storage import TimescaleDBStorage, TimescaleDBConfig
config = TimescaleDBConfig(
host="localhost",
port=5432,
database="market_data",
username="trader",
password="secret",
chunk_time_interval="1 day", # Partition by day
compression_enabled=True, # Auto-compress old data
compression_after="7 days", # Compress after 7 days
retention_policy_days=90 # Auto-delete after 90 days
)
storage = TimescaleDBStorage(config)
await storage.connect()
# Create hypertables and continuous aggregates
await storage.create_indexes()
# Query uses continuous aggregates for speed
ohlcv = await storage.query_aggregated(
"equity",
"AAPL",
start_time=datetime(2025, 1, 1),
end_time=datetime(2025, 1, 31),
interval="1min"
)
# Get TimescaleDB-specific statistics
stats = await storage.get_statistics()
print(f"Compression ratio: {stats['timescaledb']['compression_ratio']:.1%}")
# Maintenance
await storage.vacuum_and_analyze()
TimescaleDB Features: - Hypertables: Automatic partitioning by time - Compression: Up to 90% storage reduction - Continuous Aggregates: Pre-computed OHLCV bars - Retention Policies: Automatic data cleanup
Data Validation¶
Validation Pipeline¶
Comprehensive data quality checks with customizable validators.
from neutryx.market.validation import (
ValidationPipeline,
PriceRangeValidator,
SpreadValidator,
VolumeValidator,
VolatilityValidator,
TimeSeriesValidator
)
# Create pipeline
pipeline = ValidationPipeline()
# Add validators
pipeline.add_validator(PriceRangeValidator(
min_price=0.0,
max_price=100000.0,
max_jump_pct=0.20 # 20% max price jump
))
pipeline.add_validator(SpreadValidator(
max_spread_pct=0.05, # 5% max spread
check_mid_consistency=True
))
pipeline.add_validator(VolumeValidator(
min_volume=0.0,
max_volume_multiplier=10.0 # 10x average volume
))
pipeline.add_validator(VolatilityValidator(
min_vol=0.01, # 1%
max_vol=3.0 # 300%
))
pipeline.add_validator(TimeSeriesValidator(
max_gap_seconds=300 # 5 minutes max gap
))
# Validate data
data = {
"symbol": "AAPL",
"timestamp": datetime.utcnow(),
"price": 150.25,
"bid": 150.20,
"ask": 150.30,
"volume": 1000000,
"volatility": 0.25
}
results = pipeline.validate(data)
# Check results
for result in results:
if not result.passed:
print(f"[{result.severity.value.upper()}] {result.validator_name}: {result.message}")
# Get quality report
report = pipeline.get_quality_report()
print(f"Quality Score: {report.metrics.quality_score:.1%}")
print(f"Total Records: {report.metrics.total_records}")
print(f"Errors: {report.metrics.error_records}")
print(f"Warnings: {report.metrics.warning_records}")
# Get recommendations
for recommendation in report.recommendations:
print(f"- {recommendation}")
Custom Validators¶
Create custom validators for specific requirements.
from neutryx.market.validation import BaseValidator, ValidationResult, ValidationSeverity
class CustomValidator(BaseValidator):
def __init__(self):
super().__init__("CustomValidator")
def validate(self, data: Dict[str, Any]) -> ValidationResult:
# Custom validation logic
price = data.get("price")
if price and price % 1 != 0:
return ValidationResult(
validator_name=self.name,
passed=False,
severity=ValidationSeverity.WARNING,
message="Price has fractional component",
details={"price": price}
)
return ValidationResult(
validator_name=self.name,
passed=True,
message="Custom validation passed"
)
# Add to pipeline
pipeline.add_validator(CustomValidator())
Real-Time Feeds¶
Feed Manager¶
Orchestrate real-time data feeds with automatic failover and storage.
from neutryx.market.feeds import FeedManager, FeedConfig
# Configure feed manager
feed_config = FeedConfig(
enable_validation=True,
enable_storage=True,
buffer_size=1000,
flush_interval_seconds=60,
max_errors=10,
enable_failover=True
)
# Create manager with multiple adapters (failover)
manager = FeedManager(
adapters=[bloomberg_adapter, refinitiv_adapter], # Ordered by priority
storage=storage,
validation_pipeline=pipeline,
config=feed_config
)
# Start feed
await manager.start()
# Subscribe to symbols
await manager.subscribe("equity", ["AAPL", "MSFT", "GOOGL"])
await manager.subscribe("fx", ["EURUSD", "GBPUSD"])
# Add callback for real-time updates
def on_data_update(data):
print(f"Update: {data['symbol']} @ {data['price']}")
manager.add_callback(on_data_update)
# Get statistics
stats = manager.get_statistics()
print(f"Status: {stats['status']}")
print(f"Current Adapter: {stats['current_adapter']}")
print(f"Quality Score: {stats['quality_score']:.1%}")
# Stop feed
await manager.stop()
Data Subscriber¶
Manage subscriptions with callbacks.
from neutryx.market.feeds import DataSubscriber, SubscriptionRequest
subscriber = DataSubscriber()
# Subscribe with callback
def on_equity_update(data):
print(f"Equity update: {data}")
subscription_id = subscriber.subscribe(
SubscriptionRequest(
data_type="equity",
symbols=["AAPL", "MSFT"],
callback=on_equity_update
)
)
# Add global callback
def on_any_update(data):
print(f"Data update: {data}")
subscriber.add_global_callback(on_any_update)
# Notify (typically called by feed manager)
subscriber.notify({
"data_type": "equity",
"symbol": "AAPL",
"price": 150.25
})
# Unsubscribe
subscriber.unsubscribe(subscription_id)
Examples¶
Complete Real-Time Pipeline¶
import asyncio
from datetime import datetime
from neutryx.market.adapters import BloombergAdapter, BloombergConfig
from neutryx.market.storage import TimescaleDBStorage, TimescaleDBConfig
from neutryx.market.validation import (
ValidationPipeline,
PriceRangeValidator,
SpreadValidator,
VolumeValidator
)
from neutryx.market.feeds import FeedManager, FeedConfig
async def main():
# Configure Bloomberg
bloomberg_config = BloombergConfig(
adapter_name="bloomberg",
host="localhost",
port=8194
)
# Configure TimescaleDB
storage_config = TimescaleDBConfig(
host="localhost",
port=5432,
database="market_data",
username="trader",
password="secret",
compression_enabled=True,
retention_policy_days=90
)
# Initialize components
adapter = BloombergAdapter(bloomberg_config)
storage = TimescaleDBStorage(storage_config)
# Connect to storage
await storage.connect()
await storage.create_indexes()
# Setup validation pipeline
pipeline = ValidationPipeline()
pipeline.add_validator(PriceRangeValidator(min_price=0, max_jump_pct=0.20))
pipeline.add_validator(SpreadValidator(max_spread_pct=0.05))
pipeline.add_validator(VolumeValidator(max_volume_multiplier=10.0))
# Create feed manager
feed_config = FeedConfig(
enable_validation=True,
enable_storage=True,
flush_interval_seconds=60
)
manager = FeedManager(
adapters=[adapter],
storage=storage,
validation_pipeline=pipeline,
config=feed_config
)
# Start and subscribe
await manager.start()
await manager.subscribe("equity", ["AAPL", "MSFT", "GOOGL", "AMZN"])
# Run for some time
await asyncio.sleep(3600) # 1 hour
# Get quality report
report = pipeline.get_quality_report()
print(f"Quality Score: {report.metrics.quality_score:.1%}")
print(f"Total Records: {report.metrics.total_records}")
# Stop
await manager.stop()
if __name__ == "__main__":
asyncio.run(main())
Multi-Source Aggregation¶
# Use multiple adapters with automatic failover
bloomberg_adapter = BloombergAdapter(bloomberg_config)
refinitiv_adapter = RefinitivAdapter(refinitiv_config)
# Feed manager automatically fails over
manager = FeedManager(
adapters=[bloomberg_adapter, refinitiv_adapter], # Priority order
storage=storage,
validation_pipeline=pipeline,
config=FeedConfig(enable_failover=True)
)
Production Deployment¶
High Availability Setup¶
# TimescaleDB cluster
docker run -d \
--name timescaledb \
-p 5432:5432 \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=market_data \
timescale/timescaledb:latest-pg14
# MongoDB replica set
docker run -d \
--name mongodb \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=secret \
mongo:6.0
Environment Variables¶
# Bloomberg
export BLOOMBERG_HOST=localhost
export BLOOMBERG_PORT=8194
# Refinitiv
export REFINITIV_APP_KEY=your_app_key
export REFINITIV_USERNAME=your_username
export REFINITIV_PASSWORD=your_password
# TimescaleDB
export TIMESCALEDB_HOST=localhost
export TIMESCALEDB_PORT=5432
export TIMESCALEDB_DATABASE=market_data
export TIMESCALEDB_USERNAME=trader
export TIMESCALEDB_PASSWORD=secret
Performance Tuning¶
# Optimize storage configuration
storage_config = TimescaleDBConfig(
host="localhost",
port=5432,
database="market_data",
username="trader",
password="secret",
connection_pool_size=50, # Increase pool size
chunk_time_interval="1 day", # Adjust chunk size
compression_enabled=True,
compression_after="1 day", # Compress aggressively
retention_policy_days=30 # Shorter retention
)
# Optimize feed configuration
feed_config = FeedConfig(
buffer_size=5000, # Larger buffer
flush_interval_seconds=30, # More frequent flushes
max_errors=20, # Higher tolerance
enable_failover=True
)
Monitoring¶
# Get statistics
storage_stats = await storage.get_statistics()
print(f"Total Size: {storage_stats['total_size_bytes'] / 1e9:.2f} GB")
print(f"Total Rows: {storage_stats['total_rows']:,}")
if "timescaledb" in storage_stats:
print(f"Compression Ratio: {storage_stats['timescaledb']['compression_ratio']:.1%}")
print(f"Total Chunks: {storage_stats['timescaledb']['total_chunks']}")
# Feed statistics
feed_stats = manager.get_statistics()
print(f"Feed Status: {feed_stats['status']}")
print(f"Quality Score: {feed_stats.get('quality_score', 0):.1%}")
Best Practices¶
- Use TimescaleDB for Time-Series: Best performance for tick data
- Enable Compression: Reduces storage by up to 90%
- Set Retention Policies: Auto-delete old data
- Validate All Data: Catch issues early
- Use Connection Pooling: Better performance
- Monitor Quality Scores: Track data quality
- Enable Failover: Ensure high availability
- Flush Regularly: Don't lose data
Troubleshooting¶
Bloomberg Connection Issues¶
# Check Bloomberg service
import blpapi
session_options = blpapi.SessionOptions()
session_options.setServerHost("localhost")
session_options.setServerPort(8194)
session = blpapi.Session(session_options)
print(session.start()) # Should return True
Storage Connection Issues¶
# Test PostgreSQL connection
import asyncpg
conn = await asyncpg.connect(
host="localhost",
port=5432,
database="market_data",
user="trader",
password="secret"
)
print(await conn.fetchval("SELECT version()"))
await conn.close()
Performance Issues¶
-- Check TimescaleDB chunk statistics
SELECT * FROM timescaledb_information.chunks;
-- Check table sizes
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'market_data'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;