Awesome-omni-skill appwrite-python

Appwrite Python SDK skill. Use when building server-side Python applications with Appwrite, including Django, Flask, and FastAPI integrations. Covers user management, database/table CRUD, file storage, and functions via API keys.

install
source · Clone the upstream repo
git clone https://github.com/diegosouzapw/awesome-omni-skill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/appwrite-python" ~/.claude/skills/diegosouzapw-awesome-omni-skill-appwrite-python-102b79 && rm -rf "$T"
manifest: skills/development/appwrite-python/SKILL.md
source content

Appwrite Python SDK

Installation

pip install appwrite

Setting Up the Client

from appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tablesdb import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider

import os

client = (Client()
    .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
    .set_project(os.environ['APPWRITE_PROJECT_ID'])
    .set_key(os.environ['APPWRITE_API_KEY']))

Code Examples

User Management

users = Users(client)

# Create user
user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')

# List users
result = users.list([Query.limit(25)])

# Get user
fetched = users.get('[USER_ID]')

# Delete user
users.delete('[USER_ID]')

Database Operations

Note: Use

TablesDB
(not the deprecated
Databases
class) for all new code. Only use
Databases
if the existing codebase already relies on it or the user explicitly requests it.

Tip: Prefer keyword arguments (e.g.,

database_id='...'
) over positional arguments for all SDK method calls. Only use positional style if the existing codebase already uses it or the user explicitly requests it.

tables_db = TablesDB(client)

# Create database
db = tables_db.create(ID.unique(), 'My Database')

# Create row
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
    'title': 'Hello World'
})

# Query rows
results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [
    Query.equal('title', 'Hello World'),
    Query.limit(10)
])

# Get row
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')

# Update row
tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', {
    'title': 'Updated'
})

# Delete row
tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')

String Column Types

Note: The legacy

string
type is deprecated. Use explicit column types for all new columns.

TypeMax charactersIndexingStorage
varchar
16,383Full index (if size ≤ 768)Inline in row
text
16,383Prefix onlyOff-page
mediumtext
4,194,303Prefix onlyOff-page
longtext
1,073,741,823Prefix onlyOff-page
  • varchar
    is stored inline and counts towards the 64 KB row size limit. Prefer for short, indexed fields like names, slugs, or identifiers.
  • text
    ,
    mediumtext
    , and
    longtext
    are stored off-page (only a 20-byte pointer lives in the row), so they don't consume the row size budget.
    size
    is not required for these types.
# Create table with explicit string column types
tables_db.create_table(
    database_id='[DATABASE_ID]',
    table_id=ID.unique(),
    name='articles',
    columns=[
        {'key': 'title',    'type': 'varchar',    'size': 255, 'required': True},   # inline, fully indexable
        {'key': 'summary',  'type': 'text',                    'required': False},  # off-page, prefix index only
        {'key': 'body',     'type': 'mediumtext',              'required': False},  # up to ~4 M chars
        {'key': 'raw_data', 'type': 'longtext',                'required': False},  # up to ~1 B chars
    ]
)

Query Methods

# Filtering
Query.equal('field', 'value')             # == (or pass list for IN)
Query.not_equal('field', 'value')         # !=
Query.less_than('field', 100)             # <
Query.less_than_equal('field', 100)       # <=
Query.greater_than('field', 100)          # >
Query.greater_than_equal('field', 100)    # >=
Query.between('field', 1, 100)            # 1 <= field <= 100
Query.is_null('field')                    # is null
Query.is_not_null('field')                # is not null
Query.starts_with('field', 'prefix')      # starts with
Query.ends_with('field', 'suffix')        # ends with
Query.contains('field', 'sub')            # contains (string or array)
Query.search('field', 'keywords')         # full-text search (requires index)

# Sorting
Query.order_asc('field')
Query.order_desc('field')

# Pagination
Query.limit(25)                           # max rows (default 25, max 100)
Query.offset(0)                           # skip N rows
Query.cursor_after('[ROW_ID]')            # cursor pagination (preferred)
Query.cursor_before('[ROW_ID]')

# Selection & Logic
Query.select(['field1', 'field2'])        # return only specified fields
Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)])   # OR
Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)])  # AND (default)

File Storage

from appwrite.input_file import InputFile

storage = Storage(client)

# Upload file
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))

# List files
files = storage.list_files('[BUCKET_ID]')

# Delete file
storage.delete_file('[BUCKET_ID]', '[FILE_ID]')

InputFile Factory Methods

from appwrite.input_file import InputFile

InputFile.from_path('/path/to/file.png')             # from filesystem path
InputFile.from_bytes(byte_data, 'file.png')          # from bytes
InputFile.from_string('Hello world', 'hello.txt')    # from string content

Teams

from appwrite.services.teams import Teams

teams = Teams(client)

# Create team
team = teams.create(ID.unique(), 'Engineering')

# List teams
team_list = teams.list()

# Create membership (invite user by email)
membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')

# List memberships
members = teams.list_memberships('[TEAM_ID]')

# Update membership roles
teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])

# Delete team
teams.delete('[TEAM_ID]')

Role-based access: Use

Role.team('[TEAM_ID]')
for all team members or
Role.team('[TEAM_ID]', 'editor')
for a specific team role when setting permissions.

Serverless Functions

functions = Functions(client)

# Execute function
execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')

# List executions
executions = functions.list_executions('[FUNCTION_ID]')

Writing a Function Handler (Python runtime)

# src/main.py — Appwrite Function entry point
def main(context):
    # context.req — request object
    #   .body        — raw request body (string)
    #   .body_json   — parsed JSON body (dict, or None if not JSON)
    #   .headers     — request headers (dict)
    #   .method      — HTTP method (GET, POST, etc.)
    #   .path        — URL path
    #   .query       — parsed query parameters (dict)
    #   .query_string — raw query string

    context.log('Processing: ' + context.req.method + ' ' + context.req.path)

    if context.req.method == 'GET':
        return context.res.json({'message': 'Hello from Appwrite Function!'})

    data = context.req.body_json or {}
    if 'name' not in data:
        context.error('Missing name field')
        return context.res.json({'error': 'Name is required'}, 400)

    # Response methods
    return context.res.json({'success': True})                    # JSON response
    # return context.res.text('Hello')                           # plain text
    # return context.res.empty()                                 # 204 No Content
    # return context.res.redirect('https://example.com')         # 302 Redirect
    # return context.res.send('data', 200, {'X-Custom': '1'})   # custom response

Server-Side Rendering (SSR) Authentication

SSR apps (Flask, Django, FastAPI, etc.) use the server SDK to handle auth. You need two clients:

  • Admin client — uses an API key, creates sessions, bypasses rate limits (reusable singleton)
  • Session client — uses a session cookie, acts on behalf of a user (create per-request, never share)
from appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirect

# Admin client (reusable)
admin_client = (Client()
    .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
    .set_project('[PROJECT_ID]')
    .set_key(os.environ['APPWRITE_API_KEY']))

# Session client (create per-request)
session_client = (Client()
    .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
    .set_project('[PROJECT_ID]'))

session = request.cookies.get('a_session_[PROJECT_ID]')
if session:
    session_client.set_session(session)

Email/Password Login

@app.post('/login')
def login():
    account = Account(admin_client)
    session = account.create_email_password_session(
        request.json['email'], request.json['password']
    )

    # Cookie name must be a_session_<PROJECT_ID>
    resp = make_response(jsonify({'success': True}))
    resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
                    httponly=True, secure=True, samesite='Strict',
                    expires=session['expire'], path='/')
    return resp

Authenticated Requests

@app.get('/user')
def get_user():
    session = request.cookies.get('a_session_[PROJECT_ID]')
    if not session:
        return jsonify({'error': 'Unauthorized'}), 401

    session_client = (Client()
        .set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
        .set_project('[PROJECT_ID]')
        .set_session(session))

    account = Account(session_client)
    return jsonify(account.get())

OAuth2 SSR Flow

# Step 1: Redirect to OAuth provider
@app.get('/oauth')
def oauth():
    account = Account(admin_client)
    redirect_url = account.create_o_auth2_token(
        OAuthProvider.Github,
        'https://example.com/oauth/success',
        'https://example.com/oauth/failure',
    )
    return redirect(redirect_url)

# Step 2: Handle callback — exchange token for session
@app.get('/oauth/success')
def oauth_success():
    account = Account(admin_client)
    session = account.create_session(request.args['userId'], request.args['secret'])

    resp = make_response(jsonify({'success': True}))
    resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
                    httponly=True, secure=True, samesite='Strict',
                    expires=session['expire'], path='/')
    return resp

Cookie security: Always use

httponly
,
secure
, and
samesite='Strict'
to prevent XSS. The cookie name must be
a_session_<PROJECT_ID>
.

Forwarding user agent: Call

session_client.set_forwarded_user_agent(request.headers.get('user-agent'))
to record the end-user's browser info for debugging and security.

Error Handling

from appwrite.exception import AppwriteException

try:
    row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
    print(e.message)    # human-readable error message
    print(e.code)       # HTTP status code (int)
    print(e.type)       # Appwrite error type string (e.g. 'document_not_found')
    print(e.response)   # full response body (dict)

Common error codes:

CodeMeaning
401
Unauthorized — missing or invalid session/API key
403
Forbidden — insufficient permissions for this action
404
Not found — resource does not exist
409
Conflict — duplicate ID or unique constraint violation
429
Rate limited — too many requests, retry after backoff

Permissions & Roles (Critical)

Appwrite uses permission strings to control access to resources. Each permission pairs an action (

read
,
update
,
delete
,
create
, or
write
which grants create + update + delete) with a role target. By default, no user has access unless permissions are explicitly set at the document/file level or inherited from the collection/bucket settings. Permissions are arrays of strings built with the
Permission
and
Role
helpers.

from appwrite.permission import Permission
from appwrite.role import Role

Database Row with Permissions

doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
    'title': 'Hello World'
}, [
    Permission.read(Role.user('[USER_ID]')),     # specific user can read
    Permission.update(Role.user('[USER_ID]')),   # specific user can update
    Permission.read(Role.team('[TEAM_ID]')),     # all team members can read
    Permission.read(Role.any()),                 # anyone (including guests) can read
])

File Upload with Permissions

file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
    Permission.read(Role.any()),
    Permission.update(Role.user('[USER_ID]')),
    Permission.delete(Role.user('[USER_ID]')),
])

When to set permissions: Set document/file-level permissions when you need per-resource access control. If all documents in a collection share the same rules, configure permissions at the collection/bucket level and leave document permissions empty.

Common mistakes:

  • Forgetting permissions — the resource becomes inaccessible to all users (including the creator)
  • Role.any()
    with
    write
    /
    update
    /
    delete
    — allows any user, including unauthenticated guests, to modify or remove the resource
  • Permission.read(Role.any())
    on sensitive data
    — makes the resource publicly readable