Claude-skill-registry community-feed
Social feed with batch queries, cursor pagination, trending algorithms, and engagement tracking. Efficient database queries for infinite scroll feeds.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/community-feed" ~/.claude/skills/majiayu000-claude-skill-registry-community-feed && rm -rf "$T"
manifest:
skills/data/community-feed/SKILL.mdsource content
Community Feed
Social feed with trending algorithms, cursor pagination, and engagement tracking.
When to Use This Skill
- Building social feeds with infinite scroll
- Need trending/hot content algorithms
- Implementing like/engagement systems
- Want efficient pagination for large datasets
Core Concepts
Cursor pagination beats offset for large datasets. Batch-load relationships to avoid N+1. Store trending scores as computed columns for efficient sorting.
Implementation
Python
from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional, List, Dict import base64 import json @dataclass class PaginatedPosts: posts: List[Dict] total_count: int has_more: bool next_cursor: Optional[str] class CommunityFeedService: """Service for community feed with cursor pagination.""" def __init__(self, db): self.db = db async def get_feed( self, feed_type: str = "trending", viewer_id: Optional[str] = None, cursor: Optional[str] = None, limit: int = 20, tags: Optional[List[str]] = None, ) -> PaginatedPosts: cursor_data = self._parse_cursor(cursor) if cursor else None query = self.db.table("community_posts").select( "*", "assets!inner(url, asset_type)", "users!inner(id, display_name, avatar_url)", ) if tags: query = query.contains("tags", tags) # Feed-specific ordering if feed_type == "following" and viewer_id: following = await self._get_following_ids(viewer_id) if not following: return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None) query = query.in_("user_id", following) if feed_type == "trending": query = query.order("trending_score", desc=True) if cursor_data: query = query.lt("trending_score", cursor_data["score"]) else: query = query.order("created_at", desc=True) if cursor_data: query = query.lt("created_at", cursor_data["created_at"]) # Fetch one extra to check has_more query = query.limit(limit + 1) result = query.execute() posts = result.data or [] has_more = len(posts) > limit if has_more: posts = posts[:limit] # Batch load viewer's likes if viewer_id and posts: liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts]) for post in posts: post["is_liked_by_viewer"] = post["id"] in liked_ids next_cursor = None if has_more and posts: next_cursor = self._generate_cursor(posts[-1], feed_type) return PaginatedPosts( posts=posts, total_count=await self._get_total_count(tags), has_more=has_more, next_cursor=next_cursor, ) async def _get_following_ids(self, user_id: str) -> List[str]: result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute() return [r["following_id"] for r in (result.data or [])] async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set: result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute() return {r["post_id"] for r in (result.data or [])} def _parse_cursor(self, cursor: str) -> dict: try: return json.loads(base64.b64decode(cursor).decode()) except: return {} def _generate_cursor(self, post: dict, feed_type: str) -> str: if feed_type == "trending": data = {"score": post.get("trending_score", 0)} else: data = {"created_at": post["created_at"]} return base64.b64encode(json.dumps(data).encode()).decode()
# Engagement operations async def like_post(self, post_id: str, user_id: str) -> bool: existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute() if existing.data: return False # Already liked self.db.table("post_likes").insert({ "post_id": post_id, "user_id": user_id, "created_at": datetime.now(timezone.utc).isoformat(), }).execute() # Atomic increment self.db.rpc("increment_like_count", {"post_id": post_id}).execute() return True async def unlike_post(self, post_id: str, user_id: str) -> bool: result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute() if not result.data: return False self.db.rpc("decrement_like_count", {"post_id": post_id}).execute() return True # Trending algorithm def calculate_trending_score( like_count: int, comment_count: int, view_count: int, created_at: datetime, is_featured: bool = False, ) -> float: """ Trending score = engagement / age^decay Higher engagement + newer = higher score """ engagement = like_count * 1.0 + comment_count * 2.0 + view_count * 0.1 age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1) score = engagement / (age_hours ** 1.5) return score * 1.5 if is_featured else score
SQL Schema
CREATE TABLE community_posts ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), title VARCHAR(200) NOT NULL, like_count INTEGER DEFAULT 0, comment_count INTEGER DEFAULT 0, view_count INTEGER DEFAULT 0, is_featured BOOLEAN DEFAULT FALSE, tags TEXT[] DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW(), -- Computed trending score trending_score FLOAT GENERATED ALWAYS AS ( (like_count + comment_count * 2 + view_count * 0.1) / POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5) ) STORED ); CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC); CREATE INDEX idx_posts_created ON community_posts(created_at DESC); -- Atomic increment functions CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$ BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END; $$ LANGUAGE plpgsql;
Usage Examples
feed_service = CommunityFeedService(db) # Get trending feed result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20) for post in result.posts: print(f"{post['title']} - {post['like_count']} likes") # Load next page if result.has_more: next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor) # Like a post await feed_service.like_post("post_456", "user_123")
Best Practices
- Use cursor pagination over offset for large datasets
- Batch-load relationships to avoid N+1 queries
- Store trending score as computed column for efficient sorting
- Use atomic database functions for counter updates
- Cache total counts (expensive to compute)
Common Mistakes
- Using offset pagination (slow for large offsets)
- N+1 queries for author/like data
- Computing trending score on every query
- Non-atomic counter updates (race conditions)
Related Patterns
- analytics-pipeline (event tracking)
- intelligent-cache (caching feeds)