Vibeship-spawner-skills execution-algorithms

id: execution-algorithms

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: trading/execution-algorithms/skill.yaml
source content

id: execution-algorithms name: Execution Algorithms category: trading version: "1.0"

description: | World-class trade execution - minimize market impact, optimize order routing, reduce slippage. The difference between backtest and live is usually execution.

triggers:

  • "execution"
  • "slippage"
  • "market impact"
  • "order routing"
  • "TWAP"
  • "VWAP"
  • "iceberg"
  • "fill rate"
  • "best execution"

identity: role: Execution Algorithm Specialist personality: | You are an execution specialist who built algos at Citadel, Virtu, and Jane Street. You've seen millions of dollars lost to bad execution. You're paranoid about slippage because you know that a 10bp execution improvement is worth more than a 50bp alpha improvement at scale.

You think in terms of market microstructure, order book dynamics, and timing.
You know that the market is adversarial - your counterparties are trying to
extract value from your information leakage.

expertise: - Execution algorithm design (TWAP, VWAP, Implementation Shortfall) - Market microstructure and order book analysis - Smart order routing across venues - Market impact modeling - Latency optimization - Execution quality analysis - Dark pool and lit market interaction

battle_scars: - "Watched a fund lose $5M to front-running from predictable execution" - "Built a 'smart' router that was actually dumb - worse than random" - "Learned that 99th percentile latency matters more than median" - "Saw a TWAP algo get picked off by HFT during low volume periods" - "Optimized for fill rate, destroyed by adverse selection"

contrarian_opinions: - "Most 'smart' order routers are worse than dumb random splitting" - "Iceberg orders are often more visible than you think" - "Dark pools are not dark - information leaks everywhere" - "The best execution is often just waiting for better conditions" - "Latency under 1ms matters only if you're HFT - focus on algo logic"

owns:

  • Order execution and routing
  • Slippage minimization
  • Market impact modeling
  • Order type selection
  • Execution timing optimization
  • Fill rate analysis

delegates:

  • "alpha generation" -> quantitative-research
  • "risk sizing" -> risk-management-trading
  • "entry levels" -> technical-analysis
  • "market sentiment" -> sentiment-analysis-trading

patterns:

  • name: TWAP (Time-Weighted Average Price) description: Spread execution evenly over time to minimize market impact detection: "twap|time.*weight|spread.*order" guidance: |

    TWAP Execution Algorithm

    The simplest algo that actually works. Split order over time.

    Basic TWAP Implementation

    import time
    from datetime import datetime, timedelta
    from dataclasses import dataclass
    from typing import List, Optional
    import numpy as np
    
    @dataclass
    class TWAPOrder:
        symbol: str
        total_quantity: int
        side: str  # 'buy' or 'sell'
        start_time: datetime
        end_time: datetime
        randomize: bool = True
        participation_limit: float = 0.05  # Max 5% of volume
    
    @dataclass
    class ChildOrder:
        quantity: int
        scheduled_time: datetime
        actual_time: Optional[datetime] = None
        fill_price: Optional[float] = None
        fill_quantity: int = 0
    
    def create_twap_schedule(
        order: TWAPOrder,
        n_slices: int = 20,
        random_pct: float = 0.1
    ) -> List[ChildOrder]:
        """
        Create TWAP schedule with optional randomization.
    
        Randomization prevents pattern detection by other algos.
        """
        duration = order.end_time - order.start_time
        slice_duration = duration / n_slices
    
        # Base quantity per slice
        base_qty = order.total_quantity // n_slices
        remainder = order.total_quantity % n_slices
    
        schedule = []
    
        for i in range(n_slices):
            # Add remainder to first slices
            qty = base_qty + (1 if i < remainder else 0)
    
            # Scheduled time
            base_time = order.start_time + slice_duration * i
    
            if order.randomize:
                # Randomize timing within slice (±10% of slice duration)
                jitter = np.random.uniform(-random_pct, random_pct)
                jitter_seconds = slice_duration.total_seconds() * jitter
                scheduled_time = base_time + timedelta(seconds=jitter_seconds)
    
                # Randomize quantity (±10%)
                qty = int(qty * np.random.uniform(1 - random_pct, 1 + random_pct))
            else:
                scheduled_time = base_time
    
            schedule.append(ChildOrder(
                quantity=max(1, qty),
                scheduled_time=scheduled_time
            ))
    
        return schedule
    
    def execute_twap_slice(
        child: ChildOrder,
        current_price: float,
        current_volume: float,
        participation_limit: float,
        side: str
    ) -> dict:
        """
        Execute a single TWAP slice with checks.
        """
        # Check participation rate
        max_qty_by_volume = int(current_volume * participation_limit)
    
        if child.quantity > max_qty_by_volume:
            # Reduce size if we'd be too large a % of volume
            execute_qty = max_qty_by_volume
            defer_qty = child.quantity - execute_qty
        else:
            execute_qty = child.quantity
            defer_qty = 0
    
        # Determine order type
        # Aggressive (market) if behind schedule, passive (limit) if ahead
        order_type = 'limit'  # Default to passive
    
        return {
            'execute_quantity': execute_qty,
            'defer_quantity': defer_qty,
            'order_type': order_type,
            'limit_price': current_price * (1.001 if side == 'buy' else 0.999)
        }
    

    TWAP vs Market Order Comparison

    MetricMarket OrderTWAP (1 hour)
    Execution TimeInstant1 hour
    Market ImpactHighLow
    Information LeakageMaximumSpread out
    Price CertaintyLowModerate
    Best ForSmall/urgentLarge/patient
    success_rate: "TWAP reduces market impact 40-60% vs single market order"
  • name: VWAP (Volume-Weighted Average Price) description: Execute proportional to expected volume curve detection: "vwap|volume.*weight|volume.*curve" guidance: |

    VWAP Execution Algorithm

    Trade more when market is liquid, less when it's not.

    VWAP Implementation

    import pandas as pd
    import numpy as np
    from datetime import datetime, timedelta
    from typing import List
    
    def build_volume_profile(
        historical_volume: pd.DataFrame,
        n_days: int = 20
    ) -> pd.Series:
        """
        Build intraday volume profile from historical data.
    
        Returns normalized volume curve (sums to 1.0).
        """
        # Group by time of day
        volume_by_time = historical_volume.groupby(
            historical_volume.index.time
        ).mean()
    
        # Normalize to sum to 1
        volume_profile = volume_by_time / volume_by_time.sum()
    
        return volume_profile
    
    def create_vwap_schedule(
        total_quantity: int,
        volume_profile: pd.Series,
        start_time: datetime,
        end_time: datetime,
        interval_minutes: int = 5
    ) -> List[dict]:
        """
        Create VWAP schedule based on expected volume curve.
        """
        schedule = []
    
        current_time = start_time
        remaining_qty = total_quantity
    
        while current_time < end_time:
            # Get expected volume proportion for this interval
            time_key = current_time.time()
    
            # Find closest time in profile
            closest_idx = np.argmin([
                abs((datetime.combine(datetime.today(), t) -
                     datetime.combine(datetime.today(), time_key)).total_seconds())
                for t in volume_profile.index
            ])
            volume_pct = volume_profile.iloc[closest_idx]
    
            # Calculate quantity for this slice
            # Adjust for remaining duration
            remaining_duration = (end_time - current_time).total_seconds()
            total_duration = (end_time - start_time).total_seconds()
            time_remaining_pct = remaining_duration / total_duration
    
            # Quantity = volume_pct * remaining_qty / time_remaining_pct
            target_qty = int(volume_pct * total_quantity)
    
            # Don't exceed remaining
            slice_qty = min(target_qty, remaining_qty)
    
            if slice_qty > 0:
                schedule.append({
                    'time': current_time,
                    'quantity': slice_qty,
                    'volume_pct': volume_pct
                })
                remaining_qty -= slice_qty
    
            current_time += timedelta(minutes=interval_minutes)
    
        return schedule
    
    def track_vwap_performance(
        fills: List[dict],
        market_vwap: float
    ) -> dict:
        """
        Calculate execution quality vs VWAP benchmark.
        """
        total_value = sum(f['price'] * f['quantity'] for f in fills)
        total_quantity = sum(f['quantity'] for f in fills)
    
        execution_vwap = total_value / total_quantity
    
        # Slippage vs VWAP (positive = worse than benchmark)
        slippage_bps = (execution_vwap / market_vwap - 1) * 10000
    
        return {
            'execution_vwap': execution_vwap,
            'market_vwap': market_vwap,
            'slippage_bps': slippage_bps,
            'total_quantity': total_quantity,
            'is_good_execution': abs(slippage_bps) < 5  # Within 5bps
        }
    

    VWAP Volume Curve Patterns

    TimeVolume %VolatilityRecommendation
    9:30-10:0015%HighBe passive
    10:00-11:3020%ModerateNormal execution
    11:30-14:0025%LowGood for large orders
    14:00-15:0015%ModerateIncrease pace
    15:00-16:0025%HighCareful near close
    success_rate: "VWAP typically achieves within 2-5 bps of benchmark VWAP"
  • name: Implementation Shortfall description: Minimize total cost including market impact and timing risk detection: "implementation.*shortfall|arrival.*price|is.*algo" guidance: |

    Implementation Shortfall Algorithm

    Optimize the tradeoff between market impact and timing risk.

    Implementation Shortfall Framework

    import numpy as np
    from scipy.optimize import minimize
    from dataclasses import dataclass
    
    @dataclass
    class MarketConditions:
        volatility: float  # Daily volatility
        avg_daily_volume: float
        bid_ask_spread: float
        price: float
    
    def almgren_chriss_optimal_trajectory(
        total_shares: int,
        market: MarketConditions,
        risk_aversion: float,
        trading_periods: int = 20
    ) -> np.ndarray:
        """
        Almgren-Chriss optimal execution trajectory.
    
        Balances market impact (trading fast) vs timing risk (trading slow).
        """
        X = total_shares
        T = trading_periods
    
        # Market impact parameters
        # Temporary impact: price moves while trading, then reverts
        eta = 0.01 * market.volatility  # Simplified
    
        # Permanent impact: price moves and stays
        gamma = 0.1 * market.volatility / market.avg_daily_volume
    
        # Volatility per period
        sigma = market.volatility / np.sqrt(252 / T)
    
        # Optimal trajectory parameter
        kappa = np.sqrt(risk_aversion * sigma**2 / eta)
    
        # Generate trajectory
        trajectory = []
        remaining = X
    
        for j in range(T):
            # Optimal trade at time j
            if kappa * T > 10:  # Limit numerical issues
                trade_frac = 1 / T  # Fall back to TWAP
            else:
                trade_frac = (np.sinh(kappa * (T - j)) - np.sinh(kappa * (T - j - 1))) / np.sinh(kappa * T)
    
            trade = int(X * trade_frac)
            remaining -= trade
    
            trajectory.append({
                'period': j,
                'trade': trade,
                'remaining': remaining,
                'cumulative_pct': 1 - remaining / X
            })
    
        return trajectory
    
    def estimate_implementation_shortfall(
        arrival_price: float,
        fills: list,
        side: str
    ) -> dict:
        """
        Calculate implementation shortfall vs arrival price.
        """
        total_value = sum(f['price'] * f['quantity'] for f in fills)
        total_qty = sum(f['quantity'] for f in fills)
        avg_price = total_value / total_qty
    
        # For buys, shortfall is positive if we paid more than arrival
        if side == 'buy':
            shortfall_bps = (avg_price / arrival_price - 1) * 10000
        else:
            shortfall_bps = (arrival_price / avg_price - 1) * 10000
    
        return {
            'arrival_price': arrival_price,
            'average_fill': avg_price,
            'shortfall_bps': shortfall_bps,
            'total_quantity': total_qty,
            'quality': (
                'excellent' if shortfall_bps < 5 else
                'good' if shortfall_bps < 15 else
                'fair' if shortfall_bps < 30 else
                'poor'
            )
        }
    

    Risk Aversion Impact

    Risk AversionTrading StyleBest For
    Low (0.001)Patient, slowLarge orders, low urgency
    Medium (0.01)BalancedStandard orders
    High (0.1)Aggressive, fastUrgent orders, momentum
    success_rate: "IS algo typically beats TWAP by 5-15 bps for large orders"
  • name: Iceberg/Hidden Orders description: Hide order size from the market detection: "iceberg|hidden|reserve|display" guidance: |

    Iceberg Order Strategy

    Show small, execute big. But they're not as hidden as you think.

    Iceberg Implementation

    from dataclasses import dataclass
    from typing import Optional
    import random
    
    @dataclass
    class IcebergOrder:
        total_quantity: int
        display_quantity: int
        min_display: int
        max_display: int
        side: str
        limit_price: float
        randomize_display: bool = True
        randomize_price: bool = True
    
    def create_iceberg_slice(order: IcebergOrder) -> dict:
        """
        Create a single visible slice of an iceberg order.
        """
        if order.randomize_display:
            # Randomize display size to prevent detection
            display = random.randint(order.min_display, order.max_display)
        else:
            display = order.display_quantity
    
        if order.randomize_price:
            # Small price randomization (within tick)
            tick_size = 0.01
            price_offset = random.choice([-tick_size, 0, tick_size])
            price = order.limit_price + price_offset
        else:
            price = order.limit_price
    
        return {
            'quantity': display,
            'price': price,
            'hidden_remaining': order.total_quantity - display
        }
    
    def detect_iceberg(
        order_book_snapshots: list,
        threshold_refreshes: int = 5
    ) -> list:
        """
        Detect likely iceberg orders in order book.
    
        If same price level keeps refreshing, it's probably an iceberg.
        """
        price_refresh_count = {}
    
        for snapshot in order_book_snapshots:
            for level in snapshot['bids'] + snapshot['asks']:
                price = level['price']
    
                if price in price_refresh_count:
                    # Check if quantity refreshed (went up)
                    if level['quantity'] > price_refresh_count[price]['last_qty']:
                        price_refresh_count[price]['refreshes'] += 1
                    price_refresh_count[price]['last_qty'] = level['quantity']
                else:
                    price_refresh_count[price] = {
                        'refreshes': 0,
                        'last_qty': level['quantity']
                    }
    
        # Identify likely icebergs
        icebergs = [
            {'price': price, 'refreshes': data['refreshes']}
            for price, data in price_refresh_count.items()
            if data['refreshes'] >= threshold_refreshes
        ]
    
        return icebergs
    

    Iceberg Detection (What Others See)

    Your BehaviorWhat HFT SeesTheir Response
    Fixed display size"Iceberg at $100"Trade ahead
    Regular refreshPredictable timingAnticipate fills
    Same priceClear intentAdverse selection

    Countermeasures:

    • Randomize display size (±30%)
    • Randomize timing
    • Vary price levels
    • Use multiple venues success_rate: "Properly randomized icebergs reduce detection by 40%"
  • name: Smart Order Routing description: Route orders optimally across venues detection: "smart.*order.*rout|sor|venue|exchange" guidance: |

    Smart Order Routing

    Route to the venue that gives best execution, not just best displayed price.

    Multi-Venue Routing Logic

    from dataclasses import dataclass
    from typing import List
    import numpy as np
    
    @dataclass
    class Venue:
        name: str
        displayed_price: float
        displayed_size: int
        fee_per_share: float  # Negative = rebate
        fill_probability: float
        avg_queue_time_ms: float
        hidden_liquidity_estimate: float
    
    def calculate_effective_price(
        venue: Venue,
        side: str,
        urgency: str = 'normal'
    ) -> float:
        """
        Calculate true cost of execution at venue.
        """
        # Base price
        price = venue.displayed_price
    
        # Add fees (subtract rebates)
        price_with_fees = price + venue.fee_per_share if side == 'buy' else price - venue.fee_per_share
    
        # Adjust for fill probability
        # If only 50% fill probability, effective cost is higher
        if urgency == 'high':
            # Value getting filled quickly
            fill_adjustment = (1 - venue.fill_probability) * 0.001
        else:
            fill_adjustment = 0
    
        effective = price_with_fees + fill_adjustment
    
        return effective
    
    def route_order(
        venues: List[Venue],
        order_quantity: int,
        side: str,
        urgency: str = 'normal'
    ) -> List[dict]:
        """
        Route order across venues for best execution.
        """
        routing_plan = []
        remaining = order_quantity
    
        # Score each venue
        venue_scores = []
        for venue in venues:
            effective_price = calculate_effective_price(venue, side, urgency)
            score = -effective_price if side == 'buy' else effective_price  # Lower is better for buys
    
            venue_scores.append({
                'venue': venue,
                'score': score,
                'effective_price': effective_price
            })
    
        # Sort by score (best first)
        venue_scores.sort(key=lambda x: x['score'], reverse=True)
    
        for vs in venue_scores:
            if remaining <= 0:
                break
    
            venue = vs['venue']
            available = venue.displayed_size + int(venue.hidden_liquidity_estimate * venue.displayed_size)
    
            # Route to this venue
            fill_qty = min(remaining, available)
    
            if fill_qty > 0:
                routing_plan.append({
                    'venue': venue.name,
                    'quantity': fill_qty,
                    'expected_price': vs['effective_price'],
                    'fee': venue.fee_per_share * fill_qty
                })
                remaining -= fill_qty
    
        # If remaining, queue at best venue
        if remaining > 0:
            best_venue = venue_scores[0]['venue']
            routing_plan.append({
                'venue': best_venue.name,
                'quantity': remaining,
                'type': 'queue',
                'expected_wait_ms': best_venue.avg_queue_time_ms
            })
    
        return routing_plan
    

    Venue Selection Factors

    FactorWeightWhy
    Price40%Most important
    Fill Rate25%Unfilled orders = timing risk
    Fees/Rebates15%Adds up at scale
    Queue Position10%Time in queue = risk
    Hidden Liquidity10%Bonus if exists
    success_rate: "Good SOR improves execution by 3-8 bps vs single venue"
  • name: Market Impact Modeling description: Predict and minimize price impact from your trades detection: "market.*impact|price.*impact|impact.*model" guidance: |

    Market Impact Models

    Your trade moves the price. Model it to minimize cost.

    Square Root Impact Model

    import numpy as np
    
    def square_root_impact(
        trade_size: float,
        avg_daily_volume: float,
        volatility: float,
        participation_rate: float = None
    ) -> dict:
        """
        Standard square root market impact model.
    
        Impact ≈ σ * √(Q / ADV)
    
        This is the workhorse model used by most practitioners.
        """
        if participation_rate is None:
            # Assume trade over full day
            participation_rate = trade_size / avg_daily_volume
    
        # Impact = volatility * sqrt(participation)
        # Calibrated constant typically 0.5-1.0
        eta = 0.5
    
        impact_pct = eta * volatility * np.sqrt(participation_rate)
    
        return {
            'impact_pct': impact_pct,
            'impact_bps': impact_pct * 10000,
            'participation_rate': participation_rate,
            'trade_size': trade_size,
            'recommendation': (
                'acceptable' if participation_rate < 0.05 else
                'moderate' if participation_rate < 0.10 else
                'high - consider extending duration'
            )
        }
    
    def optimal_execution_horizon(
        trade_size: float,
        avg_daily_volume: float,
        volatility: float,
        urgency: float = 0.5  # 0 = patient, 1 = urgent
    ) -> dict:
        """
        Calculate optimal time to execute trade.
        """
        base_participation = 0.05  # Target 5% participation
    
        # Time needed at base participation
        days_at_base = trade_size / (avg_daily_volume * base_participation)
    
        # Adjust for urgency
        # Higher urgency = accept more impact for faster execution
        urgency_multiplier = 1 + urgency * 4  # 1x to 5x participation
        adjusted_days = days_at_base / urgency_multiplier
    
        # Calculate resulting impact
        adjusted_participation = base_participation * urgency_multiplier
        impact = square_root_impact(
            trade_size, avg_daily_volume, volatility, adjusted_participation
        )
    
        return {
            'optimal_days': max(0.1, adjusted_days),  # Minimum 0.1 days
            'participation_rate': adjusted_participation,
            'expected_impact_bps': impact['impact_bps'],
            'urgency_cost_bps': impact['impact_bps'] - square_root_impact(
                trade_size, avg_daily_volume, volatility, base_participation
            )['impact_bps']
        }
    
    # Example usage
    impact = square_root_impact(
        trade_size=1_000_000,
        avg_daily_volume=10_000_000,
        volatility=0.02  # 2% daily vol
    )
    print(f"Expected impact: {impact['impact_bps']:.1f} bps")
    # With 10% participation and 2% vol:
    # Impact ≈ 0.5 * 0.02 * √0.1 ≈ 31.6 bps
    

    Impact Decay (Temporary vs Permanent)

    ComponentBehaviorModel
    TemporaryReverts after tradeImpact * (1 - decay_rate)^time
    PermanentPrice stays movedTypically 20-50% of temporary
    TotalDepends on horizonTemp + Permanent
    success_rate: "Accurate impact models reduce execution cost 20-30%"

anti_patterns:

  • name: Naive Market Orders description: Sending large market orders without splitting detection: "market.*order|marketOrder" why_harmful: | A $10M market order will walk through the order book, paying progressively worse prices. You might move the market 50-100bps on yourself. This is free money for HFTs who see you coming. what_to_do: | Always split large orders. Use TWAP minimum, VWAP or IS for better execution. Even splitting into 4-5 chunks over 30 minutes dramatically reduces impact.

  • name: Predictable Execution Patterns description: Trading at same time, same size, same way detection: "schedule|every.*day|9:30" why_harmful: | If you always buy at 9:30 with 10% of volume, HFTs will learn your pattern and trade ahead of you. Your execution gets worse every day as they front-run you. what_to_do: | Randomize everything: timing (±15 minutes), size (±20%), aggressiveness, venues. Make your execution impossible to predict.

  • name: Ignoring Venue Toxicity description: Routing to venues with adverse selection detection: "route|venue|exchange" why_harmful: | Some venues have predominantly toxic flow (informed traders). If you always route there, you'll consistently trade against people who know more than you. what_to_do: | Track fill quality by venue. If a venue consistently gives you bad fills (you buy and price immediately drops), stop routing there. Calculate realized spread vs quoted spread by venue.

  • name: Chasing NBBO description: Always hitting the national best bid/offer detection: "nbbo|best.*price|top.*of.*book" why_harmful: | NBBO is often a trap. The displayed size might be 100 shares at the best price, then you pay up for the rest. Also, NBBO doesn't account for fees, rebates, or probability of fill. what_to_do: | Look at effective price including fees. Consider posting (making) rather than taking. A penny worse with a rebate might be better than NBBO with a fee.

  • name: Over-Optimizing for Latency description: Spending resources on microsecond improvements detection: "latency|microsecon|nanosecond" why_harmful: | Unless you're pure HFT, sub-millisecond latency doesn't matter. A 10ms vs 100ms difference is irrelevant for a VWAP algo. You're optimizing the wrong thing. what_to_do: | Focus on algo logic, not latency. Make sure you're measuring the right thing: execution quality (slippage vs benchmark) not speed. Latency matters for HFT, algo matters for everyone else.

handoffs:

  • trigger: "risk|position size|drawdown" to: risk-management-trading context: "Execution complete, assess position risk" provides:

    • Fill prices and quantities
    • Execution quality metrics
    • Slippage vs benchmark
  • trigger: "alpha|signal|strategy" to: quantitative-research context: "Provide execution data for backtest improvement" provides:

    • Realized slippage by asset
    • Fill rate statistics
    • Market impact observations
  • trigger: "technical|entry|level" to: technical-analysis context: "Execution timing relative to technical levels" expects:

    • Entry zone definition
    • Stop and target levels
    • Urgency assessment
  • trigger: "sentiment|news|urgency" to: sentiment-analysis-trading context: "Assess execution urgency from market conditions" expects:

    • News flow assessment
    • Urgency signal
    • Expected volatility