Awesome-omni-skill appwrite-python
Appwrite Python SDK skill. Use when building server-side Python applications with Appwrite, including Django, Flask, and FastAPI integrations. Covers user management, database/table CRUD, file storage, and functions via API keys.
git clone https://github.com/diegosouzapw/awesome-omni-skill
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/appwrite-python" ~/.claude/skills/diegosouzapw-awesome-omni-skill-appwrite-python-102b79 && rm -rf "$T"
skills/development/appwrite-python/SKILL.mdAppwrite Python SDK
Installation
pip install appwrite
Setting Up the Client
from appwrite.client import Client from appwrite.id import ID from appwrite.query import Query from appwrite.services.users import Users from appwrite.services.tablesdb import TablesDB from appwrite.services.storage import Storage from appwrite.services.functions import Functions from appwrite.enums.o_auth_provider import OAuthProvider import os client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project(os.environ['APPWRITE_PROJECT_ID']) .set_key(os.environ['APPWRITE_API_KEY']))
Code Examples
User Management
users = Users(client) # Create user user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name') # List users result = users.list([Query.limit(25)]) # Get user fetched = users.get('[USER_ID]') # Delete user users.delete('[USER_ID]')
Database Operations
Note: Use
(not the deprecatedTablesDBclass) for all new code. Only useDatabasesif the existing codebase already relies on it or the user explicitly requests it.DatabasesTip: Prefer keyword arguments (e.g.,
) over positional arguments for all SDK method calls. Only use positional style if the existing codebase already uses it or the user explicitly requests it.database_id='...'
tables_db = TablesDB(client) # Create database db = tables_db.create(ID.unique(), 'My Database') # Create row doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), { 'title': 'Hello World' }) # Query rows results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [ Query.equal('title', 'Hello World'), Query.limit(10) ]) # Get row row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]') # Update row tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', { 'title': 'Updated' }) # Delete row tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
String Column Types
Note: The legacy
type is deprecated. Use explicit column types for all new columns.string
| Type | Max characters | Indexing | Storage |
|---|---|---|---|
| 16,383 | Full index (if size ≤ 768) | Inline in row |
| 16,383 | Prefix only | Off-page |
| 4,194,303 | Prefix only | Off-page |
| 1,073,741,823 | Prefix only | Off-page |
is stored inline and counts towards the 64 KB row size limit. Prefer for short, indexed fields like names, slugs, or identifiers.varchar
,text
, andmediumtext
are stored off-page (only a 20-byte pointer lives in the row), so they don't consume the row size budget.longtext
is not required for these types.size
# Create table with explicit string column types tables_db.create_table( database_id='[DATABASE_ID]', table_id=ID.unique(), name='articles', columns=[ {'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable {'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only {'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars {'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars ] )
Query Methods
# Filtering Query.equal('field', 'value') # == (or pass list for IN) Query.not_equal('field', 'value') # != Query.less_than('field', 100) # < Query.less_than_equal('field', 100) # <= Query.greater_than('field', 100) # > Query.greater_than_equal('field', 100) # >= Query.between('field', 1, 100) # 1 <= field <= 100 Query.is_null('field') # is null Query.is_not_null('field') # is not null Query.starts_with('field', 'prefix') # starts with Query.ends_with('field', 'suffix') # ends with Query.contains('field', 'sub') # contains (string or array) Query.search('field', 'keywords') # full-text search (requires index) # Sorting Query.order_asc('field') Query.order_desc('field') # Pagination Query.limit(25) # max rows (default 25, max 100) Query.offset(0) # skip N rows Query.cursor_after('[ROW_ID]') # cursor pagination (preferred) Query.cursor_before('[ROW_ID]') # Selection & Logic Query.select(['field1', 'field2']) # return only specified fields Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)
File Storage
from appwrite.input_file import InputFile storage = Storage(client) # Upload file file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png')) # List files files = storage.list_files('[BUCKET_ID]') # Delete file storage.delete_file('[BUCKET_ID]', '[FILE_ID]')
InputFile Factory Methods
from appwrite.input_file import InputFile InputFile.from_path('/path/to/file.png') # from filesystem path InputFile.from_bytes(byte_data, 'file.png') # from bytes InputFile.from_string('Hello world', 'hello.txt') # from string content
Teams
from appwrite.services.teams import Teams teams = Teams(client) # Create team team = teams.create(ID.unique(), 'Engineering') # List teams team_list = teams.list() # Create membership (invite user by email) membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com') # List memberships members = teams.list_memberships('[TEAM_ID]') # Update membership roles teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin']) # Delete team teams.delete('[TEAM_ID]')
Role-based access: Use
for all team members orRole.team('[TEAM_ID]')for a specific team role when setting permissions.Role.team('[TEAM_ID]', 'editor')
Serverless Functions
functions = Functions(client) # Execute function execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}') # List executions executions = functions.list_executions('[FUNCTION_ID]')
Writing a Function Handler (Python runtime)
# src/main.py — Appwrite Function entry point def main(context): # context.req — request object # .body — raw request body (string) # .body_json — parsed JSON body (dict, or None if not JSON) # .headers — request headers (dict) # .method — HTTP method (GET, POST, etc.) # .path — URL path # .query — parsed query parameters (dict) # .query_string — raw query string context.log('Processing: ' + context.req.method + ' ' + context.req.path) if context.req.method == 'GET': return context.res.json({'message': 'Hello from Appwrite Function!'}) data = context.req.body_json or {} if 'name' not in data: context.error('Missing name field') return context.res.json({'error': 'Name is required'}, 400) # Response methods return context.res.json({'success': True}) # JSON response # return context.res.text('Hello') # plain text # return context.res.empty() # 204 No Content # return context.res.redirect('https://example.com') # 302 Redirect # return context.res.send('data', 200, {'X-Custom': '1'}) # custom response
Server-Side Rendering (SSR) Authentication
SSR apps (Flask, Django, FastAPI, etc.) use the server SDK to handle auth. You need two clients:
- Admin client — uses an API key, creates sessions, bypasses rate limits (reusable singleton)
- Session client — uses a session cookie, acts on behalf of a user (create per-request, never share)
from appwrite.client import Client from appwrite.services.account import Account from flask import request, jsonify, make_response, redirect # Admin client (reusable) admin_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]') .set_key(os.environ['APPWRITE_API_KEY'])) # Session client (create per-request) session_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]')) session = request.cookies.get('a_session_[PROJECT_ID]') if session: session_client.set_session(session)
Email/Password Login
@app.post('/login') def login(): account = Account(admin_client) session = account.create_email_password_session( request.json['email'], request.json['password'] ) # Cookie name must be a_session_<PROJECT_ID> resp = make_response(jsonify({'success': True})) resp.set_cookie('a_session_[PROJECT_ID]', session['secret'], httponly=True, secure=True, samesite='Strict', expires=session['expire'], path='/') return resp
Authenticated Requests
@app.get('/user') def get_user(): session = request.cookies.get('a_session_[PROJECT_ID]') if not session: return jsonify({'error': 'Unauthorized'}), 401 session_client = (Client() .set_endpoint('https://<REGION>.cloud.appwrite.io/v1') .set_project('[PROJECT_ID]') .set_session(session)) account = Account(session_client) return jsonify(account.get())
OAuth2 SSR Flow
# Step 1: Redirect to OAuth provider @app.get('/oauth') def oauth(): account = Account(admin_client) redirect_url = account.create_o_auth2_token( OAuthProvider.Github, 'https://example.com/oauth/success', 'https://example.com/oauth/failure', ) return redirect(redirect_url) # Step 2: Handle callback — exchange token for session @app.get('/oauth/success') def oauth_success(): account = Account(admin_client) session = account.create_session(request.args['userId'], request.args['secret']) resp = make_response(jsonify({'success': True})) resp.set_cookie('a_session_[PROJECT_ID]', session['secret'], httponly=True, secure=True, samesite='Strict', expires=session['expire'], path='/') return resp
Cookie security: Always use
,httponly, andsecureto prevent XSS. The cookie name must besamesite='Strict'.a_session_<PROJECT_ID>
Forwarding user agent: Call
to record the end-user's browser info for debugging and security.session_client.set_forwarded_user_agent(request.headers.get('user-agent'))
Error Handling
from appwrite.exception import AppwriteException try: row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]') except AppwriteException as e: print(e.message) # human-readable error message print(e.code) # HTTP status code (int) print(e.type) # Appwrite error type string (e.g. 'document_not_found') print(e.response) # full response body (dict)
Common error codes:
| Code | Meaning |
|---|---|
| Unauthorized — missing or invalid session/API key |
| Forbidden — insufficient permissions for this action |
| Not found — resource does not exist |
| Conflict — duplicate ID or unique constraint violation |
| Rate limited — too many requests, retry after backoff |
Permissions & Roles (Critical)
Appwrite uses permission strings to control access to resources. Each permission pairs an action (
read, update, delete, create, or write which grants create + update + delete) with a role target. By default, no user has access unless permissions are explicitly set at the document/file level or inherited from the collection/bucket settings. Permissions are arrays of strings built with the Permission and Role helpers.
from appwrite.permission import Permission from appwrite.role import Role
Database Row with Permissions
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), { 'title': 'Hello World' }, [ Permission.read(Role.user('[USER_ID]')), # specific user can read Permission.update(Role.user('[USER_ID]')), # specific user can update Permission.read(Role.team('[TEAM_ID]')), # all team members can read Permission.read(Role.any()), # anyone (including guests) can read ])
File Upload with Permissions
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [ Permission.read(Role.any()), Permission.update(Role.user('[USER_ID]')), Permission.delete(Role.user('[USER_ID]')), ])
When to set permissions: Set document/file-level permissions when you need per-resource access control. If all documents in a collection share the same rules, configure permissions at the collection/bucket level and leave document permissions empty.
Common mistakes:
- Forgetting permissions — the resource becomes inaccessible to all users (including the creator)
withRole.any()/write/update— allows any user, including unauthenticated guests, to modify or remove the resourcedelete on sensitive data — makes the resource publicly readablePermission.read(Role.any())