Claude-skill-registry Harness API Operations
Comprehensive guide for Harness Platform API authentication, rate limiting, error handling, and endpoint operations
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/harness-api-operations" ~/.claude/skills/majiayu000-claude-skill-registry-harness-api-operations && rm -rf "$T"
manifest:
skills/data/harness-api-operations/SKILL.mdsource content
Harness API Operations Skill
When to Use This Skill
Use this skill when you need to:
- Authenticate with Harness APIs using API keys or service account tokens
- Handle API rate limits and implement exponential backoff strategies
- Process API errors with proper retry logic and graceful degradation
- Interact with Harness endpoints for pipelines, templates, services, environments
- Implement API clients with best practices for production systems
- Debug API integration issues and understand error responses
- Build automation around Harness Platform APIs
Perfect for:
- Building CI/CD automation with Harness
- Creating infrastructure templates programmatically
- Implementing GitOps workflows
- Building custom Harness integrations
- Automating pipeline and service management
- Creating monitoring and reporting tools
- Troubleshooting API connectivity issues
Core Capabilities
1. API Authentication Methods
Master three authentication approaches:
- API Key Authentication: Header-based authentication using
x-api-key - Service Account Tokens: OAuth 2.0 bearer tokens for service accounts
- Personal Access Tokens: User-scoped tokens for personal automation
- Environment Variables: Secure credential storage patterns
- Token Rotation: Implementing credential refresh strategies
- Multi-Account Management: Handling multiple Harness accounts
2. Rate Limit Management
Implement robust rate limit handling:
- Read Operations: 1000 requests per minute
- Write Operations: 300 requests per minute
- Exponential Backoff: Progressive retry delays with jitter
- Rate Limit Headers: Parsing
headersX-RateLimit-* - Request Queuing: Managing request bursts
- Concurrent Request Limits: Throttling parallel operations
3. Error Handling Strategies
Comprehensive error management:
- HTTP Status Codes: Understanding 4xx and 5xx responses
- Retry Logic: Implementing smart retry with circuit breakers
- Error Correlation: Using correlation IDs for debugging
- Graceful Degradation: Fallback strategies for partial failures
- Error Logging: Structured error logging for observability
- Timeout Management: Request and connection timeout strategies
4. Harness API Endpoints
Complete endpoint reference:
- Pipeline API: Create, update, execute, and monitor pipelines
- Template API: Manage pipeline and step templates
- Service API: CRUD operations for services
- Environment API: Environment and infrastructure management
- IaCM API: Infrastructure as Code Management workspace operations
- Connector API: Managing cloud provider and tool connectors
- Secrets API: Secret management operations
- Project API: Project and organization management
5. Request/Response Patterns
API communication patterns:
- YAML Body Formatting: Proper YAML serialization for Harness objects
- Pagination Handling: Managing large result sets
- Filtering and Sorting: Query parameter patterns
- Bulk Operations: Batch processing strategies
- Webhook Integration: Event-driven patterns
- Streaming Responses: Handling long-running operations
Authentication Patterns
Authentication Matrix
| Method | Use Case | Header Format | Scope | Rotation |
|---|---|---|---|---|
| API Key | Service automation | | Account/Org/Project | Manual |
| Service Account Token | CI/CD integration | | Configurable RBAC | Automatic |
| Personal Access Token | User automation | | User permissions | Manual |
| OAuth 2.0 Client | Third-party apps | | Delegated | Automatic |
Authentication Code Examples
Example 1: API Key Authentication (Python)
import requests import os from typing import Optional, Dict, Any class HarnessClient: """Harness API client with API key authentication.""" def __init__( self, account_id: str, api_key: Optional[str] = None, base_url: str = "https://app.harness.io" ): """ Initialize Harness API client. Args: account_id: Harness account identifier api_key: API key (defaults to HARNESS_API_KEY env var) base_url: Harness API base URL """ self.account_id = account_id self.api_key = api_key or os.getenv("HARNESS_API_KEY") self.base_url = base_url if not self.api_key: raise ValueError("API key must be provided or set in HARNESS_API_KEY") def _get_headers(self) -> Dict[str, str]: """Generate request headers with authentication.""" return { "x-api-key": self.api_key, "Content-Type": "application/yaml", "Accept": "application/json" } def _make_request( self, method: str, endpoint: str, **kwargs ) -> requests.Response: """ Make authenticated API request. Args: method: HTTP method (GET, POST, PUT, DELETE) endpoint: API endpoint path **kwargs: Additional requests arguments Returns: Response object """ url = f"{self.base_url}{endpoint}" headers = self._get_headers() # Add account identifier to query params params = kwargs.get("params", {}) params["accountIdentifier"] = self.account_id kwargs["params"] = params response = requests.request( method=method, url=url, headers=headers, **kwargs ) return response def get_pipeline( self, org_id: str, project_id: str, pipeline_id: str ) -> Dict[str, Any]: """ Retrieve pipeline by identifier. Args: org_id: Organization identifier project_id: Project identifier pipeline_id: Pipeline identifier Returns: Pipeline configuration dictionary """ endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}" params = { "orgIdentifier": org_id, "projectIdentifier": project_id } response = self._make_request("GET", endpoint, params=params) response.raise_for_status() return response.json() # Usage client = HarnessClient( account_id="your_account_id", api_key=os.getenv("HARNESS_API_KEY") ) pipeline = client.get_pipeline( org_id="default", project_id="my_project", pipeline_id="my_pipeline" ) print(f"Pipeline: {pipeline['data']['name']}")
Example 2: Service Account Token Authentication (Node.js)
const axios = require('axios'); class HarnessClient { /** * Harness API client with service account token authentication. */ constructor(accountId, options = {}) { this.accountId = accountId; this.baseURL = options.baseURL || 'https://app.harness.io'; this.token = options.token || process.env.HARNESS_TOKEN; if (!this.token) { throw new Error('Service account token must be provided or set in HARNESS_TOKEN'); } this.client = axios.create({ baseURL: this.baseURL, headers: { 'Authorization': `Bearer ${this.token}`, 'Content-Type': 'application/yaml', 'Accept': 'application/json' } }); } /** * Add account identifier to request params. */ _addAccountParam(params = {}) { return { ...params, accountIdentifier: this.accountId }; } /** * Get pipeline by identifier. */ async getPipeline(orgId, projectId, pipelineId) { const endpoint = `/pipeline/api/pipelines/v2/${pipelineId}`; const params = this._addAccountParam({ orgIdentifier: orgId, projectIdentifier: projectId }); try { const response = await this.client.get(endpoint, { params }); return response.data; } catch (error) { console.error('Failed to fetch pipeline:', error.message); throw error; } } /** * Execute pipeline with runtime inputs. */ async executePipeline(orgId, projectId, pipelineId, runtimeInputs = {}) { const endpoint = `/pipeline/api/pipelines/v2/execute/${pipelineId}`; const params = this._addAccountParam({ orgIdentifier: orgId, projectIdentifier: projectId }); try { const response = await this.client.post( endpoint, { runtimeInputYaml: runtimeInputs }, { params } ); return response.data; } catch (error) { console.error('Failed to execute pipeline:', error.message); throw error; } } } // Usage const client = new HarnessClient('your_account_id', { token: process.env.HARNESS_TOKEN }); (async () => { try { const pipeline = await client.getPipeline('default', 'my_project', 'my_pipeline'); console.log(`Pipeline: ${pipeline.data.name}`); const execution = await client.executePipeline('default', 'my_project', 'my_pipeline', { environment: 'production' }); console.log(`Execution ID: ${execution.data.planExecution.uuid}`); } catch (error) { console.error('Error:', error); } })();
Example 3: Environment Variable Best Practices
# .env file (never commit to version control) HARNESS_ACCOUNT_ID=your_account_id HARNESS_API_KEY=pat.your_account_id.your_token HARNESS_BASE_URL=https://app.harness.io # For service account tokens HARNESS_TOKEN=your_service_account_token # Organization and project defaults HARNESS_ORG_ID=default HARNESS_PROJECT_ID=my_project # Optional: Multiple environments HARNESS_API_KEY_DEV=pat.dev_account.dev_token HARNESS_API_KEY_STAGING=pat.staging_account.staging_token HARNESS_API_KEY_PROD=pat.prod_account.prod_token
# Loading environment variables securely from dotenv import load_dotenv import os from typing import Optional load_dotenv() # Load .env file class HarnessConfig: """Centralized Harness configuration from environment variables.""" def __init__(self, environment: str = "prod"): self.environment = environment self.account_id = self._get_required("HARNESS_ACCOUNT_ID") self.base_url = os.getenv("HARNESS_BASE_URL", "https://app.harness.io") self.org_id = os.getenv("HARNESS_ORG_ID", "default") self.project_id = os.getenv("HARNESS_PROJECT_ID") # Environment-specific API key env_key = f"HARNESS_API_KEY_{environment.upper()}" self.api_key = os.getenv(env_key) or self._get_required("HARNESS_API_KEY") def _get_required(self, key: str) -> str: """Get required environment variable or raise error.""" value = os.getenv(key) if not value: raise ValueError(f"Required environment variable {key} is not set") return value # Usage config = HarnessConfig(environment="prod") client = HarnessClient( account_id=config.account_id, api_key=config.api_key, base_url=config.base_url )
Rate Limit Management
Rate Limit Rules
| Operation Type | Limit | Window | Reset Behavior |
|---|---|---|---|
| Read Operations | 1000 requests | 1 minute | Rolling window |
| Write Operations | 300 requests | 1 minute | Rolling window |
| Template Operations | 100 requests | 1 minute | Rolling window |
| Webhook Triggers | 500 requests | 1 minute | Rolling window |
Rate Limit Response Headers
X-RateLimit-Limit: 1000 # Total limit for the window X-RateLimit-Remaining: 847 # Remaining requests X-RateLimit-Reset: 1642694400 # Unix timestamp when limit resets Retry-After: 30 # Seconds to wait before retry (429 response)
Rate Limit Handling Examples
Example 1: Exponential Backoff with Jitter (Python)
import time import random from typing import Callable, Any, Optional import requests from functools import wraps class RateLimitError(Exception): """Rate limit exceeded error.""" pass def exponential_backoff_retry( max_retries: int = 5, base_delay: float = 1.0, max_delay: float = 60.0, jitter: bool = True ): """ Decorator for exponential backoff retry with jitter. Args: max_retries: Maximum number of retry attempts base_delay: Initial delay in seconds max_delay: Maximum delay between retries jitter: Add random jitter to prevent thundering herd """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs) -> Any: retries = 0 while retries <= max_retries: try: return func(*args, **kwargs) except requests.exceptions.HTTPError as e: if e.response.status_code == 429: # Rate limit exceeded if retries == max_retries: raise RateLimitError( f"Rate limit exceeded after {max_retries} retries" ) # Calculate delay with exponential backoff delay = min(base_delay * (2 ** retries), max_delay) # Add jitter to prevent synchronized retries if jitter: delay = delay * (0.5 + random.random()) # Check if server provided Retry-After header retry_after = e.response.headers.get('Retry-After') if retry_after: try: delay = float(retry_after) except ValueError: pass # Use calculated delay print(f"Rate limited. Retrying in {delay:.2f} seconds...") time.sleep(delay) retries += 1 else: raise # Re-raise non-rate-limit errors raise RateLimitError(f"Max retries ({max_retries}) exceeded") return wrapper return decorator class HarnessClientWithRetry(HarnessClient): """Harness client with automatic retry logic.""" @exponential_backoff_retry(max_retries=5, base_delay=2.0) def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response: """Make request with automatic retry on rate limit.""" response = super()._make_request(method, endpoint, **kwargs) # Log rate limit status if 'X-RateLimit-Remaining' in response.headers: remaining = int(response.headers['X-RateLimit-Remaining']) limit = int(response.headers['X-RateLimit-Limit']) if remaining < limit * 0.1: # Less than 10% remaining print(f"Warning: Rate limit low ({remaining}/{limit} remaining)") response.raise_for_status() return response # Usage client = HarnessClientWithRetry( account_id="your_account_id", api_key=os.getenv("HARNESS_API_KEY") ) # Automatic retry on rate limit pipeline = client.get_pipeline("default", "my_project", "my_pipeline")
Example 2: Request Queue with Rate Limiting (Node.js)
const pQueue = require('p-queue').default; class RateLimitedHarnessClient extends HarnessClient { /** * Harness client with request queuing and rate limiting. */ constructor(accountId, options = {}) { super(accountId, options); // Create separate queues for read and write operations this.readQueue = new pQueue({ concurrency: 10, // Max concurrent read requests interval: 60000, // 1 minute window intervalCap: 900 // Max 900 requests per minute (leaving buffer) }); this.writeQueue = new pQueue({ concurrency: 5, // Max concurrent write requests interval: 60000, // 1 minute window intervalCap: 270 // Max 270 requests per minute (leaving buffer) }); } /** * Make rate-limited request using appropriate queue. */ async _makeRateLimitedRequest(method, endpoint, options = {}) { const isWriteOperation = ['POST', 'PUT', 'PATCH', 'DELETE'].includes(method.toUpperCase()); const queue = isWriteOperation ? this.writeQueue : this.readQueue; return queue.add(async () => { try { const response = await this.client.request({ method, url: endpoint, ...options }); // Log rate limit status const remaining = response.headers['x-ratelimit-remaining']; const limit = response.headers['x-ratelimit-limit']; if (remaining && limit) { const percentRemaining = (remaining / limit) * 100; if (percentRemaining < 10) { console.warn(`Rate limit warning: ${remaining}/${limit} remaining (${percentRemaining.toFixed(1)}%)`); } } return response; } catch (error) { if (error.response?.status === 429) { const retryAfter = error.response.headers['retry-after'] || 30; console.log(`Rate limited. Waiting ${retryAfter}s before retry...`); // Wait and retry await new Promise(resolve => setTimeout(resolve, retryAfter * 1000)); return this._makeRateLimitedRequest(method, endpoint, options); } throw error; } }); } /** * Get pipeline with rate limiting. */ async getPipeline(orgId, projectId, pipelineId) { const endpoint = `/pipeline/api/pipelines/v2/${pipelineId}`; const params = this._addAccountParam({ orgIdentifier: orgId, projectIdentifier: projectId }); const response = await this._makeRateLimitedRequest('GET', endpoint, { params }); return response.data; } /** * Batch get multiple pipelines with rate limiting. */ async getPipelines(orgId, projectId, pipelineIds) { const promises = pipelineIds.map(pipelineId => this.getPipeline(orgId, projectId, pipelineId) ); return Promise.all(promises); } } // Usage const client = new RateLimitedHarnessClient('your_account_id', { token: process.env.HARNESS_TOKEN }); (async () => { // Fetch 100 pipelines - automatically queued and rate-limited const pipelineIds = Array.from({ length: 100 }, (_, i) => `pipeline_${i}`); const pipelines = await client.getPipelines('default', 'my_project', pipelineIds); console.log(`Fetched ${pipelines.length} pipelines`); })();
Example 3: Circuit Breaker Pattern
from enum import Enum from datetime import datetime, timedelta from typing import Callable, Any import requests class CircuitState(Enum): """Circuit breaker states.""" CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject requests HALF_OPEN = "half_open" # Testing if service recovered class CircuitBreaker: """ Circuit breaker for API calls to prevent cascade failures. """ def __init__( self, failure_threshold: int = 5, timeout: int = 60, expected_exception: type = requests.exceptions.HTTPError ): """ Initialize circuit breaker. Args: failure_threshold: Number of failures before opening circuit timeout: Seconds to wait before attempting recovery expected_exception: Exception type to catch """ self.failure_threshold = failure_threshold self.timeout = timeout self.expected_exception = expected_exception self.failure_count = 0 self.last_failure_time = None self.state = CircuitState.CLOSED def call(self, func: Callable, *args, **kwargs) -> Any: """ Execute function with circuit breaker protection. Args: func: Function to execute *args: Function arguments **kwargs: Function keyword arguments Returns: Function result Raises: Exception: If circuit is open or function fails """ if self.state == CircuitState.OPEN: if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout): # Try to recover self.state = CircuitState.HALF_OPEN print("Circuit breaker: Attempting recovery (HALF_OPEN)") else: raise Exception( f"Circuit breaker OPEN. Service unavailable. " f"Retry after {self.timeout}s" ) try: result = func(*args, **kwargs) # Success - reset failure count if self.state == CircuitState.HALF_OPEN: print("Circuit breaker: Recovery successful (CLOSED)") self.failure_count = 0 self.state = CircuitState.CLOSED return result except self.expected_exception as e: self.failure_count += 1 self.last_failure_time = datetime.now() if self.failure_count >= self.failure_threshold: self.state = CircuitState.OPEN print( f"Circuit breaker: Threshold reached ({self.failure_count} failures). " f"Circuit OPEN" ) raise class HarnessClientWithCircuitBreaker(HarnessClientWithRetry): """Harness client with circuit breaker pattern.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.circuit_breaker = CircuitBreaker( failure_threshold=5, timeout=60 ) def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response: """Make request with circuit breaker protection.""" return self.circuit_breaker.call( super()._make_request, method, endpoint, **kwargs ) # Usage client = HarnessClientWithCircuitBreaker( account_id="your_account_id", api_key=os.getenv("HARNESS_API_KEY") ) # Protected by circuit breaker try: pipeline = client.get_pipeline("default", "my_project", "my_pipeline") except Exception as e: print(f"Request failed: {e}")
Error Handling
HTTP Status Code Reference
| Status Code | Meaning | Retry Strategy | Common Causes |
|---|---|---|---|
| 400 Bad Request | Invalid request format | Do not retry | Malformed YAML, missing required fields |
| 401 Unauthorized | Authentication failed | Do not retry | Invalid or expired API key/token |
| 403 Forbidden | Insufficient permissions | Do not retry | Missing RBAC permissions |
| 404 Not Found | Resource doesn't exist | Do not retry | Wrong identifier or deleted resource |
| 409 Conflict | Resource conflict | Retry after resolution | Concurrent modification |
| 422 Unprocessable Entity | Validation failed | Do not retry | Invalid field values |
| 429 Too Many Requests | Rate limit exceeded | Retry with backoff | Too many requests |
| 500 Internal Server Error | Server error | Retry with backoff | Temporary server issue |
| 502 Bad Gateway | Gateway error | Retry with backoff | Proxy or load balancer issue |
| 503 Service Unavailable | Service overloaded | Retry with backoff | Temporary unavailability |
| 504 Gateway Timeout | Request timeout | Retry with backoff | Long-running operation |
Error Response Format
{ "status": "ERROR", "code": "INVALID_REQUEST", "message": "Pipeline with identifier 'invalid_pipeline' does not exist", "correlationId": "abc123-def456-ghi789", "detailedMessage": "The requested pipeline 'invalid_pipeline' was not found in project 'my_project' under organization 'default'", "responseMessages": [ { "code": "RESOURCE_NOT_FOUND", "level": "ERROR", "message": "Pipeline not found", "failureTypes": ["APPLICATION_ERROR"] } ] }
Error Handling Examples
Example 1: Comprehensive Error Handler (Python)
import logging from typing import Optional, Dict, Any from enum import Enum import requests class ErrorSeverity(Enum): """Error severity levels.""" TRANSIENT = "transient" # Temporary, retry PERMANENT = "permanent" # Permanent, do not retry FATAL = "fatal" # Fatal, terminate class HarnessAPIError(Exception): """Base exception for Harness API errors.""" def __init__( self, message: str, status_code: int, response: Optional[Dict[str, Any]] = None, correlation_id: Optional[str] = None ): self.message = message self.status_code = status_code self.response = response or {} self.correlation_id = correlation_id super().__init__(self.message) @property def severity(self) -> ErrorSeverity: """Determine error severity based on status code.""" if self.status_code in [429, 500, 502, 503, 504]: return ErrorSeverity.TRANSIENT elif self.status_code in [400, 401, 403, 404, 422]: return ErrorSeverity.PERMANENT else: return ErrorSeverity.FATAL @property def should_retry(self) -> bool: """Determine if error should be retried.""" return self.severity == ErrorSeverity.TRANSIENT class HarnessErrorHandler: """Centralized error handling for Harness API operations.""" def __init__(self): self.logger = logging.getLogger(__name__) def handle_response(self, response: requests.Response) -> Dict[str, Any]: """ Handle API response and raise appropriate errors. Args: response: Requests response object Returns: Response data dictionary Raises: HarnessAPIError: For any error responses """ try: response.raise_for_status() return response.json() except requests.exceptions.HTTPError as e: error_data = self._extract_error_data(response) correlation_id = response.headers.get('X-Correlation-Id') # Log error with context self.logger.error( f"Harness API error: {error_data['message']} " f"(Status: {response.status_code}, " f"Correlation ID: {correlation_id})" ) raise HarnessAPIError( message=error_data['message'], status_code=response.status_code, response=error_data, correlation_id=correlation_id ) from e def _extract_error_data(self, response: requests.Response) -> Dict[str, Any]: """Extract error information from response.""" try: data = response.json() # Handle different error response formats if isinstance(data, dict): return { 'message': data.get('message', 'Unknown error'), 'code': data.get('code', 'UNKNOWN'), 'detailed_message': data.get('detailedMessage'), 'response_messages': data.get('responseMessages', []) } except ValueError: pass # Fallback to status code message return { 'message': response.reason, 'code': str(response.status_code), 'detailed_message': response.text[:500] if response.text else None, 'response_messages': [] } class RobustHarnessClient(HarnessClientWithRetry): """Harness client with comprehensive error handling.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.error_handler = HarnessErrorHandler() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response: """Make request with error handling.""" try: response = super()._make_request(method, endpoint, **kwargs) return self.error_handler.handle_response(response) except HarnessAPIError as e: if e.status_code == 404: # Handle specific error cases self.logger.warning(f"Resource not found: {endpoint}") raise elif e.status_code == 401: self.logger.error("Authentication failed. Check API key/token.") raise elif e.status_code == 403: self.logger.error( f"Insufficient permissions for {method} {endpoint}. " f"Check RBAC settings." ) raise else: raise # Usage with error handling client = RobustHarnessClient( account_id="your_account_id", api_key=os.getenv("HARNESS_API_KEY") ) try: pipeline = client.get_pipeline("default", "my_project", "my_pipeline") print(f"Successfully fetched pipeline: {pipeline['data']['name']}") except HarnessAPIError as e: if e.severity == ErrorSeverity.PERMANENT: print(f"Permanent error (do not retry): {e.message}") print(f"Correlation ID: {e.correlation_id}") elif e.severity == ErrorSeverity.TRANSIENT: print(f"Transient error (safe to retry): {e.message}") else: print(f"Fatal error: {e.message}") raise
Example 2: Graceful Degradation Pattern
from typing import Optional, List, Dict, Any import logging class PipelineService: """ Service layer with graceful degradation for pipeline operations. """ def __init__(self, client: RobustHarnessClient): self.client = client self.logger = logging.getLogger(__name__) self.cache = {} # Simple in-memory cache def get_pipeline_with_fallback( self, org_id: str, project_id: str, pipeline_id: str, use_cache: bool = True ) -> Optional[Dict[str, Any]]: """ Get pipeline with fallback to cached version on error. Args: org_id: Organization identifier project_id: Project identifier pipeline_id: Pipeline identifier use_cache: Whether to use cached fallback Returns: Pipeline data or None if unavailable """ cache_key = f"{org_id}/{project_id}/{pipeline_id}" try: # Try to fetch from API pipeline = self.client.get_pipeline(org_id, project_id, pipeline_id) # Update cache on success if use_cache: self.cache[cache_key] = pipeline return pipeline except HarnessAPIError as e: if e.severity == ErrorSeverity.TRANSIENT and use_cache: # Fall back to cache for transient errors cached_pipeline = self.cache.get(cache_key) if cached_pipeline: self.logger.warning( f"API error, using cached pipeline data: {e.message}" ) return cached_pipeline else: self.logger.error( f"API error and no cached data available: {e.message}" ) return None else: # Permanent errors or cache disabled self.logger.error(f"Failed to fetch pipeline: {e.message}") return None def batch_get_pipelines( self, org_id: str, project_id: str, pipeline_ids: List[str], fail_fast: bool = False ) -> List[Optional[Dict[str, Any]]]: """ Batch fetch pipelines with error isolation. Args: org_id: Organization identifier project_id: Project identifier pipeline_ids: List of pipeline identifiers fail_fast: Stop on first error if True Returns: List of pipeline data (None for failed fetches) """ results = [] errors = [] for pipeline_id in pipeline_ids: try: pipeline = self.get_pipeline_with_fallback( org_id, project_id, pipeline_id ) results.append(pipeline) except Exception as e: errors.append({ 'pipeline_id': pipeline_id, 'error': str(e) }) if fail_fast: raise else: # Continue processing remaining pipelines results.append(None) # Log summary success_count = sum(1 for r in results if r is not None) self.logger.info( f"Batch fetch complete: {success_count}/{len(pipeline_ids)} successful" ) if errors: self.logger.warning(f"Errors encountered: {len(errors)}") for error in errors: self.logger.warning( f" - {error['pipeline_id']}: {error['error']}" ) return results # Usage client = RobustHarnessClient( account_id="your_account_id", api_key=os.getenv("HARNESS_API_KEY") ) service = PipelineService(client) # Single pipeline with fallback pipeline = service.get_pipeline_with_fallback("default", "my_project", "my_pipeline") if pipeline: print(f"Pipeline: {pipeline['data']['name']}") else: print("Pipeline unavailable") # Batch fetch with error isolation pipeline_ids = ["pipeline1", "pipeline2", "pipeline3"] pipelines = service.batch_get_pipelines("default", "my_project", pipeline_ids) print(f"Fetched {sum(1 for p in pipelines if p)} pipelines")
Harness API Endpoints
Complete Endpoint Reference
Pipeline API
Base: /pipeline/api/pipelines/v2
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
| GET | Get pipeline by ID | Read |
| POST | Create pipeline | Write |
| PUT | Update pipeline | Write |
| DELETE | Delete pipeline | Write |
| POST | Execute pipeline | Write |
| GET | Get execution status | Read |
| POST | List pipelines (with filters) | Read |
| GET | Get pipeline summary | Read |
Template API
Base: /template/api/templates
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
| GET | Get template by ID | Read |
| POST | Create template | Write |
| PUT | Update template | Write |
| DELETE | Delete template | Write |
| POST | List templates | Read |
| GET | Get template versions | Read |
Service API
Base: /ng/api/servicesV2
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
| GET | Get service by ID | Read |
| POST | Create service | Write |
| PUT | Update service | Write |
| DELETE | Delete service | Write |
| GET | List services | Read |
Environment API
Base: /ng/api/environmentsV2
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
| GET | Get environment by ID | Read |
| POST | Create environment | Write |
| PUT | Update environment | Write |
| DELETE | Delete environment | Write |
| GET | List environments | Read |
IaCM API
Base: /iacm/api/workspaces
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
| GET | Get workspace by ID | Read |
| POST | Create workspace | Write |
| PUT | Update workspace | Write |
| DELETE | Delete workspace | Write |
| POST | Trigger workspace run | Write |
| GET | Get run status | Read |
Connector API
Base: /ng/api/connectors
| Endpoint | Method | Description | Rate Limit |
|---|---|---|---|
| GET | Get connector by ID | Read |
| POST | Create connector | Write |
| PUT | Update connector | Write |
| DELETE | Delete connector | Write |
| POST | Test connector | Write |
| GET | List connectors | Read |
Endpoint Usage Examples
Example 1: Complete Pipeline CRUD Operations
from typing import Dict, Any, Optional class PipelineManager: """Manager for pipeline CRUD operations.""" def __init__(self, client: RobustHarnessClient, org_id: str, project_id: str): self.client = client self.org_id = org_id self.project_id = project_id def create_pipeline(self, pipeline_yaml: str) -> Dict[str, Any]: """ Create a new pipeline. Args: pipeline_yaml: Pipeline YAML definition Returns: Created pipeline data """ endpoint = "/pipeline/api/pipelines/v2" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id } response = self.client._make_request( "POST", endpoint, params=params, data=pipeline_yaml, headers={"Content-Type": "application/yaml"} ) return response def get_pipeline(self, pipeline_id: str) -> Dict[str, Any]: """ Get pipeline by identifier. Args: pipeline_id: Pipeline identifier Returns: Pipeline data """ endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id } response = self.client._make_request("GET", endpoint, params=params) return response def update_pipeline(self, pipeline_id: str, pipeline_yaml: str) -> Dict[str, Any]: """ Update existing pipeline. Args: pipeline_id: Pipeline identifier pipeline_yaml: Updated pipeline YAML Returns: Updated pipeline data """ endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id } response = self.client._make_request( "PUT", endpoint, params=params, data=pipeline_yaml, headers={"Content-Type": "application/yaml"} ) return response def delete_pipeline(self, pipeline_id: str) -> bool: """ Delete pipeline. Args: pipeline_id: Pipeline identifier Returns: True if deleted successfully """ endpoint = f"/pipeline/api/pipelines/v2/{pipeline_id}" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id } self.client._make_request("DELETE", endpoint, params=params) return True def execute_pipeline( self, pipeline_id: str, runtime_inputs: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Execute pipeline with optional runtime inputs. Args: pipeline_id: Pipeline identifier runtime_inputs: Runtime input values Returns: Execution data with execution ID """ endpoint = f"/pipeline/api/pipelines/v2/execute/{pipeline_id}" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id } body = {} if runtime_inputs: import yaml body["runtimeInputYaml"] = yaml.dump(runtime_inputs) response = self.client._make_request( "POST", endpoint, params=params, json=body ) return response def get_execution_status(self, execution_id: str) -> Dict[str, Any]: """ Get pipeline execution status. Args: execution_id: Execution identifier Returns: Execution status data """ endpoint = f"/pipeline/api/pipelines/v2/execution/{execution_id}" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id } response = self.client._make_request("GET", endpoint, params=params) return response def list_pipelines( self, filters: Optional[Dict[str, Any]] = None, page: int = 0, size: int = 50 ) -> Dict[str, Any]: """ List pipelines with optional filters. Args: filters: Filter criteria page: Page number (0-indexed) size: Page size Returns: Paginated list of pipelines """ endpoint = "/pipeline/api/pipelines/v2/list" params = { "orgIdentifier": self.org_id, "projectIdentifier": self.project_id, "page": page, "size": size } body = filters or {} response = self.client._make_request( "POST", endpoint, params=params, json=body ) return response # Usage client = RobustHarnessClient( account_id="your_account_id", api_key=os.getenv("HARNESS_API_KEY") ) manager = PipelineManager(client, "default", "my_project") # Create pipeline pipeline_yaml = """ pipeline: identifier: my_new_pipeline name: My New Pipeline projectIdentifier: my_project orgIdentifier: default stages: - stage: name: Deploy identifier: deploy type: Deployment """ created = manager.create_pipeline(pipeline_yaml) print(f"Created pipeline: {created['data']['identifier']}") # Execute pipeline execution = manager.execute_pipeline("my_new_pipeline", { "environment": "production" }) execution_id = execution['data']['planExecution']['uuid'] print(f"Started execution: {execution_id}") # Monitor execution status = manager.get_execution_status(execution_id) print(f"Execution status: {status['data']['pipelineExecutionSummary']['status']}") # List all pipelines pipelines = manager.list_pipelines(page=0, size=100) print(f"Found {len(pipelines['data']['content'])} pipelines") # Update pipeline updated_yaml = pipeline_yaml.replace("My New Pipeline", "My Updated Pipeline") updated = manager.update_pipeline("my_new_pipeline", updated_yaml) print(f"Updated pipeline: {updated['data']['name']}") # Delete pipeline deleted = manager.delete_pipeline("my_new_pipeline") print(f"Deleted: {deleted}")
Request/Response Patterns
YAML Body Formatting
Harness APIs accept YAML for resource definitions. Proper formatting is critical.
import yaml from typing import Dict, Any def create_pipeline_yaml(config: Dict[str, Any]) -> str: """ Generate properly formatted pipeline YAML. Args: config: Pipeline configuration dictionary Returns: YAML string """ pipeline_dict = { "pipeline": { "identifier": config["identifier"], "name": config["name"], "projectIdentifier": config["project_id"], "orgIdentifier": config["org_id"], "stages": config.get("stages", []) } } # Use safe_dump with proper formatting return yaml.safe_dump( pipeline_dict, default_flow_style=False, sort_keys=False, indent=2 ) # Usage config = { "identifier": "my_pipeline", "name": "My Pipeline", "org_id": "default", "project_id": "my_project", "stages": [ { "stage": { "name": "Build", "identifier": "build", "type": "CI" } } ] } pipeline_yaml = create_pipeline_yaml(config) print(pipeline_yaml)
Pagination Handling
from typing import Iterator, Dict, Any def paginate_pipelines( manager: PipelineManager, page_size: int = 100 ) -> Iterator[Dict[str, Any]]: """ Generator to iterate through all pipelines with pagination. Args: manager: PipelineManager instance page_size: Number of items per page Yields: Pipeline data dictionaries """ page = 0 while True: response = manager.list_pipelines(page=page, size=page_size) pipelines = response['data']['content'] if not pipelines: break for pipeline in pipelines: yield pipeline # Check if there are more pages total_pages = response['data']['totalPages'] if page >= total_pages - 1: break page += 1 # Usage for pipeline in paginate_pipelines(manager, page_size=50): print(f"Pipeline: {pipeline['identifier']}")
Best Practices
1. Secure Credential Management
Do:
- Store API keys in environment variables or secrets managers
- Rotate credentials regularly
- Use service accounts with minimal required permissions
- Implement credential refresh for long-running services
Don't:
- Commit credentials to version control
- Share credentials across environments
- Use overly permissive API keys
- Log credentials in error messages
2. Rate Limit Optimization
Do:
- Implement request queuing for batch operations
- Use exponential backoff with jitter
- Monitor rate limit headers proactively
- Cache frequently accessed data
- Use webhooks instead of polling when possible
Don't:
- Ignore rate limit headers
- Retry immediately after 429
- Make unnecessary duplicate requests
- Use fixed retry intervals
3. Error Handling Strategy
Do:
- Log errors with correlation IDs
- Implement circuit breakers for cascading failures
- Use structured error logging
- Provide fallback mechanisms
- Distinguish between transient and permanent errors
Don't:
- Swallow errors silently
- Retry on permanent errors (4xx)
- Expose internal error details to end users
- Assume all 5xx errors are retryable
4. API Request Optimization
Do:
- Use batch endpoints when available
- Request only needed fields
- Implement client-side caching
- Use conditional requests (ETags)
- Compress request/response bodies
Don't:
- Fetch entire resource graphs unnecessarily
- Poll for status updates too frequently
- Make sequential requests that could be parallel
- Ignore pagination for large result sets
5. Monitoring and Observability
Do:
- Track API latency and error rates
- Monitor rate limit usage
- Log correlation IDs for debugging
- Set up alerts for error thresholds
- Use structured logging
Don't:
- Rely on client-side logs alone
- Ignore slow API responses
- Skip correlation ID tracking
- Log sensitive data
Related Skills
- [[infrastructure-template-generator]] - Generate Harness templates programmatically
- [[harness-pipeline-builder]] - Build pipelines using API
- [[error-recovery-patterns]] - Advanced error recovery strategies
- [[api-client-design]] - Design robust API clients
- [[rate-limit-strategies]] - Advanced rate limiting patterns
- [[authentication-patterns]] - Secure authentication implementations
- [[observability-patterns]] - Monitoring and logging best practices
Troubleshooting
Common Issues and Solutions
Issue: Authentication Failures
Symptoms:
- 401 Unauthorized responses
- "Invalid API key" errors
Solutions:
- Verify API key format:
pat.{account_id}.{token} - Check API key permissions and scope
- Ensure account ID matches API key
- Verify token hasn't expired
- Check for leading/trailing whitespace in credentials
Issue: Rate Limit Exceeded
Symptoms:
- 429 Too Many Requests
- Retry-After headers in responses
Solutions:
- Implement exponential backoff with jitter
- Use request queuing
- Cache frequently accessed data
- Batch operations when possible
- Monitor rate limit headers proactively
Issue: Intermittent 500/502/503 Errors
Symptoms:
- Occasional server errors
- Gateway timeout responses
Solutions:
- Implement retry logic with backoff
- Use circuit breaker pattern
- Add request timeout configuration
- Check Harness status page for incidents
- Contact Harness support with correlation IDs
Issue: YAML Parsing Errors
Symptoms:
- 400 Bad Request
- "Invalid YAML" error messages
Solutions:
- Validate YAML syntax before sending
- Check for special characters needing escaping
- Ensure proper indentation
- Use YAML libraries instead of string concatenation
- Validate against Harness schema
Version: 1.0.0 Last Updated: 2026-01-19 Skill Type: API Integration & Operations Complexity: Intermediate to Advanced