Asi atproto-ingest
Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
install
source · Clone the upstream repo
git clone https://github.com/plurigrid/asi
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/plurigrid/asi "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/atproto-ingest" ~/.claude/skills/plurigrid-asi-atproto-ingest-41ac34 && rm -rf "$T"
manifest:
skills/atproto-ingest/SKILL.mdsource content
AT Protocol Data Ingestion
Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
GF(3) Trit: +1 (Generator) — Produces data streams for downstream processing.
Authentication
App Password (Recommended for scripts)
# Create session curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession \ -H "Content-Type: application/json" \ -d '{"identifier": "handle.bsky.social", "password": "app-password-here"}' # Response contains accessJwt and refreshJwt export BSKY_TOKEN="eyJ..."
OAuth (For apps)
# Authorization endpoint: https://bsky.social/oauth/authorize # Token endpoint: https://bsky.social/oauth/token # Scopes: atproto, transition:generic
Capabilities
1. fetch-user-posts
Get all posts from a user by handle or DID.
# Resolve handle to DID curl "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=user.bsky.social" # Get author feed (paginated) curl "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \ -H "Authorization: Bearer $BSKY_TOKEN"
Pagination loop:
def fetch_all_posts(actor: str, token: str) -> list: posts, cursor = [], None while True: url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100" if cursor: url += f"&cursor={cursor}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) data = resp.json() posts.extend(data.get("feed", [])) cursor = data.get("cursor") if not cursor: break time.sleep(0.5) # Rate limit respect return posts
2. get-engagement-graph
Map likes, reposts, replies, and quotes for a post.
# Get likes curl "https://bsky.social/xrpc/app.bsky.feed.getLikes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \ -H "Authorization: Bearer $BSKY_TOKEN" # Get reposts curl "https://bsky.social/xrpc/app.bsky.feed.getRepostedBy?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \ -H "Authorization: Bearer $BSKY_TOKEN" # Get quotes (search for post URI) curl "https://bsky.social/xrpc/app.bsky.feed.getQuotes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \ -H "Authorization: Bearer $BSKY_TOKEN"
Engagement record schema:
CREATE TABLE engagement ( post_uri VARCHAR PRIMARY KEY, author_did VARCHAR, like_count INTEGER, repost_count INTEGER, reply_count INTEGER, quote_count INTEGER, likers VARCHAR[], -- Array of DIDs reposters VARCHAR[], indexed_at TIMESTAMP );
3. stream-mentions
Real-time monitoring via Firehose or polling.
# Firehose (WebSocket) - all network events wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos # Polling fallback - notifications endpoint curl "https://bsky.social/xrpc/app.bsky.notification.listNotifications?limit=50" \ -H "Authorization: Bearer $BSKY_TOKEN"
Firehose filter (Python):
import websocket import cbor2 def on_message(ws, message): header, body = cbor2.loads(message) if header.get("op") == 1: # Commit for op in body.get("ops", []): if "app.bsky.feed.post" in op.get("path", ""): record = op.get("record", {}) # Check for mentions in facets for facet in record.get("facets", []): for feature in facet.get("features", []): if feature.get("$type") == "app.bsky.richtext.facet#mention": if feature.get("did") == TARGET_DID: yield record ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", on_message=on_message)
4. extract-thread-tree
Full conversation tree from a post.
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \ -H "Authorization: Bearer $BSKY_TOKEN"
Thread tree structure:
def extract_thread_tree(thread_data: dict) -> dict: """Flatten thread to adjacency list for DuckDB.""" nodes = [] edges = [] def walk(node, parent_uri=None): post = node.get("post", {}) uri = post.get("uri") nodes.append({ "uri": uri, "author_did": post.get("author", {}).get("did"), "text": post.get("record", {}).get("text"), "created_at": post.get("record", {}).get("createdAt"), "depth": node.get("depth", 0) }) if parent_uri: edges.append({"parent": parent_uri, "child": uri}) for reply in node.get("replies", []): walk(reply, uri) walk(thread_data.get("thread", {})) return {"nodes": nodes, "edges": edges}
5. get-follower-network
First and second-order connections.
# Followers curl "https://bsky.social/xrpc/app.bsky.graph.getFollowers?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \ -H "Authorization: Bearer $BSKY_TOKEN" # Following curl "https://bsky.social/xrpc/app.bsky.graph.getFollows?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \ -H "Authorization: Bearer $BSKY_TOKEN"
Second-order expansion:
def get_follower_network(actor: str, depth: int = 2) -> dict: """BFS expansion of follower graph.""" visited = set() edges = [] queue = [(actor, 0)] while queue: current, d = queue.pop(0) if current in visited or d > depth: continue visited.add(current) followers = paginate_all("app.bsky.graph.getFollowers", actor=current) for f in followers: did = f["did"] edges.append({"from": did, "to": current, "type": "follows"}) if d < depth: queue.append((did, d + 1)) return {"nodes": list(visited), "edges": edges}
Rate Limiting
| Endpoint Category | Rate Limit | Window |
|---|---|---|
| Read (feed, graph) | 3000/5min | Rolling |
| Write (post, like) | 1500/hr | Rolling |
| Firehose | Unlimited | - |
| Search | 100/min | Rolling |
Backoff strategy:
def rate_limited_request(url, headers, max_retries=5): for attempt in range(max_retries): resp = requests.get(url, headers=headers) if resp.status_code == 429: wait = int(resp.headers.get("Retry-After", 2 ** attempt)) time.sleep(wait) continue return resp raise Exception("Rate limit exceeded")
DuckDB Ingestion Format
-- Posts table CREATE TABLE bsky_posts ( uri VARCHAR PRIMARY KEY, cid VARCHAR, author_did VARCHAR, text TEXT, created_at TIMESTAMP, reply_parent VARCHAR, reply_root VARCHAR, embed_type VARCHAR, langs VARCHAR[], labels VARCHAR[], ingested_at TIMESTAMP DEFAULT now() ); -- Social graph CREATE TABLE bsky_graph ( subject_did VARCHAR, object_did VARCHAR, relation VARCHAR, -- 'follows', 'blocks', 'mutes' created_at TIMESTAMP, PRIMARY KEY (subject_did, object_did, relation) ); -- Engagement events CREATE TABLE bsky_engagement ( uri VARCHAR, actor_did VARCHAR, action VARCHAR, -- 'like', 'repost', 'quote', 'reply' target_uri VARCHAR, created_at TIMESTAMP, PRIMARY KEY (uri) ); -- Insert from JSON COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);
AT Protocol Lexicon Reference
| Lexicon | Purpose |
|---|---|
| User's posts |
| Thread tree |
| Who liked |
| Who reposted |
| Follower list |
| Following list |
| User profile |
| Firehose |
Integration with Layer 2
Output feeds into:
- GF(3) Trit 0 (Transformer): Sentiment analysis, embedding generation
- GF(3) Trit -1 (Consumer): Dashboard display, alert triggers
# Pipeline handoff posts = fetch_all_posts(actor, token) duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts]) # Signal next layer publish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})