「Python」タグの記事が12件件あります
Pythonプログラミング言語、フレームワーク、データサイエンス
全てのタグを見るasdfでPythonをバージョン管理
Pythonのバージョン管理にasdfを使用することで、複数のPythonバージョンを簡単に管理できるようになりました。
asdfはPythonだけでなく、Node.js、Javaなど複数のランタイムを統一的に管理できる非常に便利なツールです。
以下の手順はasdf v0.18.0に基づいて説明します。
asdfとは
asdfは複数のプログラミング言語のランタイムバージョンを管理するためのツールです。pyenv、nvm、rbenvなどの個別のバージョン管理ツールを統一的に置き換えることができます。
主な利点:
- 複数の言語を一つのツールで管理
- プロジェクトごとのバージョン設定
- 豊富なプラグインエコシステム
asdfとvenvの違い
asdf: Pythonインタープリター自体のバージョン管理
- Python 3.11.7、3.12.1など異なるPythonバージョンを切り替え
venv: パッケージ(ライブラリ)の分離管理
- 同じPythonバージョンでもプロジェクトごとに異なるパッケージセットを使用
- 依存関係の競合を防ぐ
両方を組み合わせることで、Pythonバージョンとパッケージの両方を適切に管理できます。
asdfとvenvの違い
asdf: Pythonインタープリター自体のバージョン管理
- Python 3.11.7、3.12.1など異なるPythonバージョンを切り替え
venv: パッケージ(ライブラリ)の分離管理
- 同じPythonバージョンでもプロジェクトごとに異なるパッケージセットを使用
- 依存関係の競合を防ぐ
両方を組み合わせることで、Pythonバージョンとパッケージの両方を適切に管理できます。
asdfのインストール
macOSの場合
公式ガイドに従ってインストールします。
brew install asdf
zshの設定
~/.zshrcファイルに以下を追加:
# asdfの設定
export PATH="${ASDF_DATA_DIR:-$HOME/.asdf}/shims:$PATH"
export ASDF_DATA_DIR="$HOME/.asdf/data"
補完機能の設定
# 補完ディレクトリの作成
mkdir -p "${ASDF_DATA_DIR:-$HOME/.asdf}/completions"
# zsh補完ファイルの生成
asdf completion zsh > "${ASDF_DATA_DIR:-$HOME/.asdf}/completions/_asdf"
再度~/.zshrcに以下を追加:
# asdf補完の有効化
fpath=(${ASDF_DATA_DIR:-$HOME/.asdf}/completions $fpath)
autoload -Uz compinit && compinit
設定を反映:
source ~/.zshrc
Pythonプラグインのインストールと設定
プラグインの追加
# Pythonプラグインを追加
asdf plugin add python
# インストール済みプラグインの確認
asdf plugin list
利用可能なPythonバージョンの確認
# インストール可能なPythonバージョンを表示
asdf list all python
Pythonのインストール
# 最新版をインストール
asdf install python latest
# 特定バージョンをインストール(例)
asdf install python 3.11.7
# インストール済みバージョンの確認
asdf list python
バージョンの設定
asdf v0.18.0ではasdf setコマンドを使用します:
# グローバル設定(HOMEフォルダ以下全体で使用)
asdf set --home python latest
# 現在の設定確認
asdf current python
出力例:
Name Version Source
python 3.13.5 /Users/username/.tool-versions
# Pythonコマンドの場所確認
which python3
出力例:
/Users/username/.asdf/data/shims/python3
プロジェクト固有の設定
プロジェクトディレクトリで特定のPythonバージョンを使用する場合:
# プロジェクトディレクトリに移動
cd /path/to/your/project
# プロジェクト用のPythonバージョンを設定
asdf set python 3.11.7
# .tool-versionsファイルが作成される
cat .tool-versions
出力例:
python 3.11.7
このファイルがあるディレクトリとその配下では、指定されたPythonバージョンが自動的に使用されます。
よく使うコマンド一覧
# 現在使用中のバージョン確認
asdf current
# 特定言語の現在バージョン
asdf current python
# インストール済みバージョン一覧
asdf list python
# 利用可能バージョン一覧(最新10件)
asdf list all python | tail -10
# プラグインの更新
asdf plugin update python
# 古いバージョンの削除
asdf uninstall python 3.10.0
トラブルシューティング
Pythonが見つからない場合
# shimの再作成
asdf reshim python
# パスの確認
echo $PATH | grep asdf
権限エラーが発生する場合
# asdfディレクトリの権限確認
ls -la ~/.asdf/
# 必要に応じて権限修正
chmod -R 755 ~/.asdf/
設定が反映されない場合
# .zshrcの再読み込み
source ~/.zshrc
# 現在のシェル設定確認
echo $ASDF_DATA_DIR
echo $PATH | grep asdf
実際の使用例
新しいプロジェクトでの設定
# プロジェクトディレクトリを作成
mkdir my-python-project
cd my-python-project
# プロジェクト用のPythonバージョンを指定
asdf set --local python 3.11.7
# 仮想環境の作成
python -m venv venv
source venv/bin/activate
# パッケージのインストール
pip install requests flask
チームでの.tool-versionsファイル共有
# .tool-versionsファイルをGitで共有
echo "python 3.11.7" > .tool-versions
echo "nodejs 18.19.0" >> .tool-versions
# チームメンバーは以下でバージョンをインストール
asdf install
設定が反映されない場合
# .zshrcの再読み込み
source ~/.zshrc
# 現在のシェル設定確認
echo $ASDF_DATA_DIR
echo $PATH | grep asdf
実際の使用例
新しいプロジェクトでの設定
# プロジェクトディレクトリを作成
mkdir my-python-project
cd my-python-project
# 1. asdfでPythonバージョンを指定(インタープリター管理)
asdf set python 3.11.7
# 2. venvで仮想環境を作成(パッケージ管理)
python -m venv venv
source venv/bin/activate
# 3. プロジェクト固有のパッケージをインストール
pip install requests flask
# 結果:Python 3.11.7 + 独立したパッケージ環境
なぜvenvも必要なのか?
問題:venvを使わない場合
# プロジェクトA
cd project-a
asdf set python 3.11.7
pip install django==4.2.0 requests==2.28.0
# プロジェクトB(同じPython 3.11.7)
cd ../project-b
pip install flask==2.3.0 requests==2.31.0 # ← requests 2.28.0が上書きされる
# プロジェクトAに戻ると...
cd ../project-a
# django が requests 2.31.0 で動作しない可能性!
解決:asdf + venv
# プロジェクトA: Python 3.11.7 + 独立した環境
project-a/ → django 4.2.0 + requests 2.28.0
# プロジェクトB: Python 3.11.7 + 独立した環境
project-b/ → flask 2.3.0 + requests 2.31.0
チームでの.tool-versionsファイル共有
# .tool-versionsファイルをGitで共有
echo "python 3.11.7" > .tool-versions
echo "nodejs 18.19.0" >> .tool-versions
# チームメンバーは以下でバージョンをインストール
asdf install
まとめ
asdfを使用することで以下のメリットが得られます:
- 統一的な管理: 複数の言語のバージョン管理を一つのツールで実現
- プロジェクト固有設定:
.tool-versionsファイルでプロジェクトごとの環境を管理 - 簡単な切り替え: コマンド一つでバージョン切り替えが可能
- チーム開発:
.tool-versionsファイルでチーム全体の環境を統一
Python開発での環境構築が大幅に簡素化されるため、特に複数のプロジェクトを扱う開発者にはぜひ活用していただきたいツールです。
Python FAISSでベクトル検索
- FAISSを使って高次元ベクトルの中から、クエリベクトルに最も「似ている」ベクトルを高速に検索できる
- RAGにおいて、FAISSを使って、関連文書を検出ことができる
詳細
高次元の課題 (Curse of Dimensionality): 次元数が増えると(例えば、数千次元のベクトル)、ベクトル間の距離計算は非常に高コストになり、従来のデータベース手法では効率的に検索できなくなります。FAISSは、この高次元空間での非効率性を克服するための様々な工夫(近似検索アルゴリズム)を提供します。
類似性検索のニーズ: 「意味的に似ているものを探す」「内容が近い画像を見つける」「類似する推薦アイテムを提案する」といった、人間の直感的な「似ている」を数値データから見つけ出すニーズが、AI/MLの発展とともに爆発的に増加しました。FAISSは、このニーズに応えるための高速なソリューションです。
スケーラビリティ: データ量が膨大になっても、秒単位やミリ秒単位で検索結果を返すことが求められます。FAISSは、インデックス構造の工夫や並列処理、GPU活用などにより、このスケーラビリティを実現します。
FAISSは、基本的にメモリ内で高速な類似検索を行いますが、大規模なデータセットに対応するために、メモリ管理やストレージとの連携に関する様々な機能や戦略を持っています。
利用
FAISSのAPIは、大きく分けてインデックスの作成と検索の2つのフェーズで構成されます。
- インデックスの作成(Add / Train) まず、検索対象となるベクトルデータをFAISSのインデックス構造に「追加」します。
import faiss
import numpy as np
# 1. ベクトルデータの準備
# 例として、128次元のベクトルが1000個あると仮定
dimension = 128
num_vectors = 1000
data_vectors = np.random.rand(num_vectors, dimension).astype('float32')
# 2. インデックスの選択と初期化
# 最もシンプルなインデックス: IndexFlatL2
# L2 はユークリッド距離を意味し、最も厳密な(近似ではない)検索を行う
index = faiss.IndexFlatL2(dimension)
print(f"インデックスが空か?: {index.is_trained}") # IndexFlatL2は訓練不要なのでTrue
print(f"インデックスのベクトルの数: {index.ntotal}") # 初期状態では0
# 3. インデックスへのベクトルの追加
index.add(data_vectors)
print(f"インデックスのベクトルの数: {index.ntotal}") # 1000
faiss.IndexFlatL2(dimension): これは最も基本的なインデックスで、全てのベクトルをそのままメモリに格納し、クエリが来たら総当たりで距離を計算します(ブルートフォース検索)。L2 はユークリッド距離(二点間の直線距離)を意味します。これは厳密な最近傍探索を行いますが、データ量が増えると非常に遅くなります。
index.is_trained: インデックスによっては、データを追加する前に「訓練(train)」が必要なものがあります(例: IndexIVFFlat, IndexPQ)。これは、ベクトル空間を効率的に分割するためのクラスタリングなどを行います。IndexFlatL2 は訓練不要なので最初から True です。
index.add(vectors): 訓練が終わった(または不要な)インデックスに、実際にベクトルデータを追加します。
2. 検索 (Search)
# 1. クエリベクトルの準備
# 例として、1つのクエリベクトルを準備
query_vector = np.random.rand(1, dimension).astype('float32') # 1つのクエリは (1, dimension) 形状
# 2. 検索の実行
# k=5: 最も近い5つのベクトルを検索
k = 5
distances, indices = index.search(query_vector, k)
print(f"検索されたベクトルのインデックス (元の data_vectors での): {indices}")
print(f"検索されたベクトルの距離: {distances}")
index.search(query_vectors, k):
query_vectors: 検索したいベクトル。複数個のクエリを一度に渡すことも可能です(例: (num_queries, dimension) のNumPy配列)。
k: 取得したい最近傍ベクトルの数。
戻り値:
distances: 検索された k 個のベクトルとクエリベクトルとの間の距離。
indices: 検索された k 個のベクトルが、インデックス追加時に持っていた元のインデックス(add したときの順番)を示します。
3. インデックスの保存とロード
作成したインデックスはファイルに保存し、後でロードして再利用できます。
# インデックスの保存
faiss.write_index(index, "my_faiss_index.bin")
print("インデックスを 'my_faiss_index.bin' に保存しました。")
# インデックスのロード
loaded_index = faiss.read_index("my_faiss_index.bin")
print(f"ロードされたインデックスのベクトルの数: {loaded_index.ntotal}")
Python Development Setup, pyenv, uv
This article introduce how to setup python development environment for Web development.
For data science / machine learning development environment, you might prefer other approaches, like anaconda.
Details
Install uv
US is the modern solution and handles both Python versions AND package management. It's like having nvm + npm in one tool.
curl -LsSf https://astral.sh/uv/install.sh | sh
source $HOME/.local/bin/env
Use uv
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install and use Python
uv python install 3.12
uv python pin 3.12 # Sets Python version for project
Use uv to initialize the project
cd my-project
uv init --python 3.12
Then the following files are generated:
tree
├── main.py
├── pyproject.toml
└── README.md
pyproject.toml is just like package.json
Initialize venv:
% uv venv
Using CPython 3.12.11
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate
% source .venv/bin/activate
You can use venv in this project now.
# install deps
# it automatically use venv
uv sync
# add new package
uv add fastapi sqlalchemy alembic
Check the pyproject.toml file's
If want to init a fastapi project, using fastapi's template here might be a better choice.
git clone git@github.com:fastapi/full-stack-fastapi-template.git my-full-stack
cd my-full-stack
git remote set-url origin git@github.com:octocat/my-full-stack.git
git push -u origin master
Pytest
- pytest を使うことで、テストコードを簡潔かつ可読性の高いものにできる
- フィクスチャを活用することで、テストの前提条件の準備や後処理を柔軟に記述可能
- デコレーターによってデータ駆動テストをシンプルに記述できる
- 詳細なテスト結果を提供し、デバッグが容易
- 豊富なプラグインエコシステムにより、機能の拡張も容易
はじめに
Jest や JUnit などの他のテストフレームワークと比べて、pytest の導入は非常にシンプルです。
インストール
pip install -U pytest
セットアップ
pytest は、以下の簡単なルールに基づいてテストケースを自動的に検出します:
- テストファイル名:test_ で始まるか、_test.py で終わるファイル
- テスト関数/クラス:test_ で始まる関数、または Test で始まるクラスの中にある関数(ただし init は不可)
特別な設定ファイルなどは基本的に不要で、すぐにテストコードの記述を始められます。
一般的には、プロジェクトルートに tests/ ディレクトリを作成してテストコードを配置します。
my_project/
├── my_package/
│ ├── __init__.py
│ ├── module_a.py
│ └── module_b.py
├── tests/
│ ├── __init__.py # Can be empty, but often present
│ ├── unit/ # Optional: subdirectories for different test types
│ │ ├── test_module_a_unit.py
│ │ └── test_module_b_unit.py
│ ├── integration/ # Optional: for integration tests
│ │ └── test_api_integration.py
│ ├── functional/ # Optional: for end-to-end tests
│ │ └── test_user_flow.py
│ └── conftest.py # For shared fixtures (more on this below)
├── README.md
└── setup.py
テストの書き方
基本は assert を使って期待値を検証
# test_calculations.py
from my_functions import add, subtract, multiply
def test_add_positive_numbers():
assert add(2, 3) == 5
def test_add_negative_numbers():
assert add(-1, -5) == -6
@pytest.fixture を使った前処理・後処理の共通化
目的
- テスト実行前の初期状態の準備(例:DB接続、設定ファイル、モックの準備)
- テスト後のクリーンアップ処理(例:接続の切断、一時ファイル削除)
- 共通処理の再利用による重複の排除
定義方法
- 通常の Python 関数に @pytest.fixture をつけて定義します
- yield の前に前処理、後に後処理を記述します
- 使用する側では引数にフィクスチャ名を指定するだけで自動的に注入されます
利用方法
- フィクスチャを使用したいテスト関数や他のフィクスチャの引数に、定義したフィクスチャ関数名を指定するだけです。Pytestが自動的に依存性を解決し、フィクスチャが提供する値を注入してくれます。
スコープの指定
フィクスチャはデフォルトで function スコープですが、scope 引数でその実行頻度を変更できます。
- function (デフォルト): 各テスト関数ごとに1回実行。
- class: テストクラス内の全テストメソッドに対して1回実行。
- module: モジュール(ファイル)内の全テストに対して1回実行。
- session: テストセッション全体で1回だけ実行。
フィクスチャの依存関係
フィクスチャは他のフィクスチャを引数として受け取ることができます。
conftest.py を使ったフィクスチャの共有
フィクスチャは、conftest.py という特別なファイルに定義することで、複数のテストファイル間で共有することができます。
conftest.py に定義されたフィクスチャは、明示的なインポートなしに、同じディレクトリやサブディレクトリ内のすべてのテストファイルから利用可能です。これにより、プロジェクト全体で共通のセットアップロジックを一元管理できます。
@pytest.fixture(scope="module") # モジュールスコープに設定
def module_db_connection():
"""モジュール内で一度だけ実行されるDB接続のフィクスチャ"""
print("\n[Module Scope] DB接続を確立しました。")
db_conn = {"status": "connected", "data": []}
yield db_conn
print("[Module Scope] DB接続を切断しました。")
@pytest.fixture
def empty_db():
"""A fixture that provides an empty dictionary, simulating an empty database."""
print("\nSetting up empty_db...") # This will print during test execution
_mock_db.clear() # Ensure it's empty before each test using this fixture
yield _mock_db # Yield the resource to the test
print("Tearing down empty_db...") # This runs after the test finishes
_mock_db.clear() # Clean up after the test
@pytest.fixture
def populated_db():
"""A fixture that provides a populated dictionary, simulating a database with some data."""
print("\nSetting up populated_db...")
_mock_db.clear()
_mock_db["user1"] = {"name": "Alice", "email": "alice@example.com"}
_mock_db["user2"] = {"name": "Bob", "email": "bob@example.com"}
yield _mock_db
print("Tearing down populated_db...")
_mock_db.clear()
def test_add_user_to_empty_db(module_db_connection, empty_db):
"""Test adding a user to an initially empty database."""
print("Running test_add_user_to_empty_db...")
empty_db["user3"] = {"name": "Charlie", "email": "charlie@example.com"}
assert "user3" in empty_db
assert len(empty_db) == 1
def test_retrieve_user_from_populated_db(populated_db):
"""Test retrieving an existing user from a populated database."""
print("Running test_retrieve_user_from_populated_db...")
user = populated_db.get("user1")
assert user is not None
assert user["name"] == "Alice"
assert user["email"] == "alice@example.com"
@pytest.mark.parametrize でデータ駆動テスト
同じロジックを異なるデータセットで繰り返しテストしたい場合、parametrize を使えばコードを繰り返す必要がありません。
import pytest
def is_palindrome(s):
"""Checks if a string is a palindrome."""
cleaned_s = "".join(filter(str.isalnum, s)).lower()
return cleaned_s == cleaned_s[::-1]
@pytest.mark.parametrize("input_string, expected_output", [
("racecar", True),
("madam", True),
("A man, a plan, a canal: Panama", True), # With punctuation and spaces
("hello", False),
("Python", False),
("", True), # Empty string is a palindrome
("a", True), # Single character is a palindrome
])
def test_is_palindrome(input_string, expected_output):
"""Test the is_palindrome function with various inputs."""
assert is_palindrome(input_string) == expected_output
その他
テストをスキップする方法。 実際のプロジェクトを開発する際に、テストをスキップするのは、実務上たまに使ってるユースケースです。
Pytestを使ったら簡単にできます。
# test_feature_status.py
import pytest
def divide(a, b):
if b == 0:
raise ZeroDivisionError("Cannot divide by zero")
return a / b
@pytest.mark.skip(reason="This feature is not yet implemented")
def test_new_feature_logic():
"""A test for a feature that's still under development."""
assert 1 == 2 # This test would fail, but it's skipped
@pytest.mark.skipif(
pytest.__version__ < "8.0",
reason="Requires pytest version 8.0 or higher"
)
def test_new_pytest_feature():
"""This test only runs if a specific Pytest version is met."""
assert True
@pytest.mark.xfail(reason="Bug #1234: Division by zero is not handled gracefully yet")
def test_divide_by_zero_xfail():
"""This test is expected to fail due to a known bug."""
assert divide(10, 0) == 0 # This will raise ZeroDivisionError, but it's xfailed
総評
pytest はシンプルながらも強力な機能を持ち、テストコードの質を大きく向上させることができます。特にフィクスチャやパラメータ化テストを活用することで、実用的で保守性の高いテストが可能です。
FastAPI Authentication
- Use jose to encode / decode jwt
- Use passlib to verify hashed password
Details
Utility functions
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
# --- Configuration ---
# You should get these from environment variables in a real application
SECRET_KEY = "your-super-secret-jwt-key" # MAKE THIS A LONG, RANDOM STRING!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Example: 30 minutes
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2PasswordBearer to extract token from Authorization header
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # "token" is the endpoint for login
# --- Password Hashing Functions ---
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies a plain password against a hashed password."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hashes a plain password."""
return pwd_context.hash(password)
# --- JWT Token Functions ---
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Creates a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> dict:
"""Decodes and validates a JWT access token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# You can add more validation here, e.g., check for 'sub' or 'user_id'
return payload
except JWTError:
raise credentials_exception
# --- Dependency for current user ---
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
"""
Dependency to get the current user from a JWT token.
Raises an HTTPException if the token is invalid or expired.
"""
payload = decode_access_token(token)
user_id: str = payload.get("sub") # 'sub' is commonly used for subject (e.g., user ID)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
# In a real app, you would fetch the user from your database here
# to ensure they still exist and are active.
# For simplicity, we'll just return the user_id for now.
return {"user_id": user_id}
Usage
# A helper function to authenticate user (combines with your "database" logic)
def authenticate_user(username: str, password: str) -> UserInDB | None:
user = get_user(username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
"""
Login endpoint to generate JWT access token.
Uses OAuth2PasswordRequestForm for standard username/password input.
"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, # 'sub' claim typically holds the user identifier
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: dict = Depends(get_current_user)):
"""
Protected endpoint: Returns information about the current authenticated user.
Requires a valid JWT token in the Authorization header.
"""
# In a real application, you would fetch the full user object from the DB
# using current_user["user_id"]
username = current_user["user_id"]
user_data = fake_users_db.get(username)
if not user_data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return User(**user_data)
@app.get("/protected-route/")
async def protected_route(current_user: dict = Depends(get_current_user)):
"""
Another protected endpoint.
"""
return {"message": f"Welcome, {current_user['user_id']}! You accessed a protected route."}
# Example of a public endpoint
@app.get("/")
async def read_root():
return {"message": "Welcome to the unauthenticated public endpoint!"}
Python SQL Alchemy
- Use sqlalchemy library to read/write db
- engine is the database connection gateway
- Base: The Declarative Base for ORM Models
Details
engine is the primary communication hub between your Python application and your actual database.
Connection Management: It's responsible for managing a pool of database connections. Instead of opening and closing a new connection for every single operation (which is slow and resource-intensive), the engine keeps a pool of ready-to-use connections.
Dialect Specifics: It understands the "dialect" of the specific database you're using (e.g., MySQL, PostgreSQL, SQLite). It translates SQLAlchemy's generic commands into the correct SQL syntax for that database.
Statement Execution: It's the underlying component that actually sends SQL statements to the database and receives results.
Transaction Management: It works with sessions to manage transactions.
Base object is the foundation upon which you build your SQLAlchemy ORM models. It links your Python classes to your database tables.
import os
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from contextlib import contextmanager # For a clean session manager
# --- Database Configuration ---
# You'd typically get these from environment variables or a config file
DB_USER = os.environ.get("DB_USER", "myuser")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "mypassword")
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "3306")
DB_NAME = os.environ.get("DB_NAME", "my_test_db")
# MySQL connection string using PyMySQL driver
# Format: mysql+pymysql://user:password@host:port/dbname
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
print(f"Attempting to connect to: {DATABASE_URL}")
# --- 1. Create the Engine ---
# The engine manages the connection pool and dialect specifics.
# echo=True is great for debugging; it logs all SQL statements to console.
engine = create_engine(DATABASE_URL, echo=True, pool_pre_ping=True)
# --- 2. Define the Base ---
# Base is the declarative base class that our ORM models will inherit from.
Base = declarative_base()
# --- 3. Define the ORM Model ---
# This Python class maps to a database table.
class User(Base):
__tablename__ = 'users' # The actual table name in the database
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False)
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
# --- 4. Create the Session Factory ---
# A sessionmaker factory creates Session objects.
# Sessions are the actual interface for database operations (transactions, queries).
Session = sessionmaker(bind=engine)
# --- Context Manager for Session (Best Practice) ---
# This ensures the session is properly closed even if errors occur.
@contextmanager
def get_session():
session = Session()
try:
yield session
session.commit() # Auto-commit on success
except SQLAlchemyError as e:
print(f"Database error occurred: {e}")
session.rollback() # Rollback on error
raise # Re-raise the exception after rollback
finally:
session.close() # Always close the session
# --- CRUD Operations ---
def create_tables():
print("\n--- Creating tables ---")
try:
# Base.metadata contains all table definitions inherited from Base.
# create_all creates these tables in the database linked by the engine.
Base.metadata.create_all(engine)
print("Tables created successfully.")
except SQLAlchemyError as e:
print(f"Error creating tables: {e}")
def create_user(name: str, email: str):
print(f"\n--- Creating user: {name} ({email}) ---")
with get_session() as session:
new_user = User(name=name, email=email)
session.add(new_user)
print(f"Added user: {new_user}")
return new_user
def read_users():
print("\n--- Reading users ---")
with get_session() as session:
users = session.query(User).all() # Query all users
if users:
for user in users:
print(user)
else:
print("No users found.")
return users
def read_user_by_email(email: str):
print(f"\n--- Reading user by email: {email} ---")
with get_session() as session:
user = session.query(User).filter_by(email=email).first() # Query by email
if user:
print(f"Found user: {user}")
else:
print(f"User with email '{email}' not found.")
return user
def update_user_email(user_id: int, new_email: str):
print(f"\n--- Updating user {user_id}'s email to {new_email} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
old_email = user.email
user.email = new_email
# session.commit() is handled by the context manager on success
print(f"Updated user {user.name} email from {old_email} to {user.email}")
return user
else:
print(f"User with ID {user_id} not found.")
return None
def delete_user(user_id: int):
print(f"\n--- Deleting user with ID: {user_id} ---")
with get_session() as session:
user = session.query(User).filter_by(id=user_id).first()
if user:
session.delete(user)
# session.commit() is handled by the context manager on success
print(f"Deleted user: {user.name} ({user.id})")
return user
else:
print(f"User with ID {user_id} not found.")
return None
# --- Main Execution ---
if __name__ == "__main__":
# Ensure environment variables are set or defaults are acceptable
if "DB_USER" not in os.environ:
print("WARNING: DB_USER environment variable not set. Using default 'myuser'.")
if "DB_PASSWORD" not in os.environ:
print("WARNING: DB_PASSWORD environment variable not set. Using default 'mypassword'.")
# 1. Create tables (only run this once or when schema changes)
create_tables()
# 2. Create some users
user1 = create_user("Alice", "alice@example.com")
user2 = create_user("Bob", "bob@example.com")
# Try to create user with duplicate email to see error handling
try:
create_user("Charlie", "alice@example.com")
except SQLAlchemyError:
print(" (Expected error: Duplicate email caught and rolled back)")
# 3. Read all users
read_users()
# 4. Read a specific user by email
read_user_by_email("bob@example.com")
read_user_by_email("nonexistent@example.com")
# 5. Update a user
if user1: # Only if user1 was created successfully
update_user_email(user1.id, "alice.new@example.com")
read_user_by_email("alice.new@example.com")
# 6. Delete a user
if user2: # Only if user2 was created successfully
delete_user(user2.id)
read_users() # Show that Bob is gone
# Try deleting a non-existent user
delete_user(999)
print("\n--- All operations complete ---")
# In a real application, the engine would be disposed when the app shuts down.
# For this script, Python will clean it up on exit.
Python File System Operations
- Use pathlib to handle file paths and operations by default
- For finegraind read/write over file I/O (streaming), use context manager
- Use the
tempfileModule for Temporary Files/Directories - Use shutil for High-level Operations
Details
Use pathlib is the modern, oo way to handle file paths, and operations.
from pathlib import Path
# Create Path Objects:
my_file = Path("data")
# Use Path methods
my_file.exists()
# Content I/O
my_file.read_text()
#Path.write_text()
#Path.write_text()
#Path.write_bytes()
Create a Path Object
# From CWD
#
current_dir = Path.cwd()
# From Home
home_dir = Path.home()
# From absolute Paths
abs_path = Path("/usr/local/bin/python")
# From relative paths (relative to CWD)
relative_path = Path("data/input.csv")
# Create path by manipulation
base_dir = Path('/opt/my_app')
config_file = base_dire / "config" / "settings.yaml"
parent_dir = config_file.parent
Dealing with file name
# Get file / directory name
config_file.name
# Getting Stem
config_file.stem # settings
# Getting suffix
config_file.suffix
config_file.suffixes
# Get absolute path
config_file.resolve()
# or
config_file.absolute()
# Get relative path
relative_to = config_file.relative_to(project_root)
Check / Query File System
my_file.exists()
my_file.is_file()
my_file.is_dir()
my_file.is_symlink()
# Statistics
stats = temp_file.stat()
Operations
# Create directories
new_dir.mkdir()
# create empty file
empty_file.touch()
# delete file
file_to_delete.unlink()
# delete empty directories
empty_folder.rmdir()
# rename / move file or directories
old_path.rename(new_path)
# Changing suffix
config_file.with_suffix('.yml')
File Content I/O
config_path = Path("config.txt")
config_path.write_text("debug=True\nlog_level=INFO")
content = config_path.read_text()
binary_data_file = Path("binary_data.bin")
binary_data_file.write_bytes(b'\x01\x02\x03\x04')
data = binary_data_file.read_bytes()
print(f"Binary data: {data}")
directory iteration / traversal
# List
project_root.iterdir()
# Globbing
project_root.glob("*.py")
# Walking Directory Tree (Python 3.12+)
project_root.walk()
Use Context Managers (with open(...)) for File I/O
When you need more fine-grained control over file reading/writing, (streaming large files, specific encoding, or binary modes), use the with statement.
try:
with open("my_large_file.csv", "w", encoding="utf-8") as f:
f.write("Header1,Header2\n")
for i in range(1000):
f.write(f"data_{i},value_{i}\n")
except IOError as e:
print(f"Error writing file: {e}")
Use the tempfile Module for Temporary Files/Directories
import tempfile
from pathlib import Path
# Using a temporary directory
with tempfile.TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
temp_file = tmp_dir / "temp_report.txt"
temp_file.write_text("Ephemeral data.")
print(f"Created temporary file at: {temp_file}")
# At the end of the 'with' block, tmp_dir_str and its contents are deleted
print("Temporary directory removed.")
Use shutil for High-level Operations
shutil Focuses on operations that involing moving, copying, or deleting entire trees of files and directories, or other utility functions that go beyond a single Path obejct's scope.
import shutil
source_dir = Path("my_data")
destination_dir = Path("backup_data")
try:
shutil.copytree(source_dir, destination_dir)
print(f"Copied '{source_dir}' to '{destination_dir}'")
except FileExistsError:
print(f"Destination '{destination_dir}' already exists. Skipping copy.")
except Exception as e:
print(f"Error copying tree: {e}")
import shutil
from pathlib import Path
dir_to_delete = Path("backup_data") # Assuming this exists from the copytree example
if dir_to_delete.exists():
print(f"Deleting '{dir_to_delete}'...")
shutil.rmtree(dir_to_delete)
print("Directory deleted.")
else:
print(f"Directory '{dir_to_delete}' does not exist.")
Zip / Tarring
shutil even can create compressed archieves, and unpack them.
archive_path = shutil.make_archive(archive_name, 'zip', source_dir)
print(f"Created archive: {archive_path}")
Copy File Metadata
- shutil.copystat(src, dst) copy permission bits, last access time, last modification time and flags from one file to another
- shutil.copy2(src, dst) copies the file and metadata
Getting Disk Usage
usage = shutil.disk_usage(Path(".")) # Check current directory's disk
print(f"Total: {usage.total / (1024**3):.2f} GB")
print(f"Used: {usage.used / (1024**3):.2f} GB")
print(f"Free: {usage.free / (1024**3):.2f} GB")
Do not
- Avoid os.system() or subprocess.run() for file operations in most case
Fluent Python Data model
- By implementing the special methods (__len__, etc), it hook into built-in operations (len(), etc)
- By implementing special methods combination they build protocols (interfaces)
- By overloading the methods __add__, object participate in operations like +
- Object Customization Hooks: __getattr__
Special Methods
None of special methods is directly called. Python interpreter is the only frequent caller of most special methods.
\_\_repr\_\_: is called by repr, to get the string representation of the object for inspection. returned by \_\_repr\_\_ should be unambiguous and sometimes match the source code necessary to re-create the represented object.
\_\_str\_\_: called by the str() built-in and implicitly used by the print function. It should return a string suitable for display to end users. If \_\_repr\_\_ is user-friendly you don't need to code \_\_str\_\_
If you only implement one of them, choose, \_\_repr\_\_.
To determine whether a value x is truthy or falsy, Python applies bool(x), which returns either True or False.
By default, instances of user-defined classes are considered truthy, unless either \_\_bool\_\_ or \_\_len\_\_ is implemented.
bool(x) will call x.__bool__() and use the result. If \_\_bool\_\_ is not implemented, Python tries to invoke x.__len__(), and if that returns zero, bool returns False. Otherwise bool returns True.
Collection API
- Iterable to support for, unpacking, and ohter forms of iteration.
- Sized to support the len built-in function
- Container to support the in operator
Three important Collections:
- Sequence: formalizing the interface of built-ins like list and str;
- Mapping: implemented by dict, collections.defaultdict, etc
- Set: the interface of the set and frozenset built-in types
String/bytes representation: \_\_repr\_\_, \_\_str\_\_, \_\_format\_\_, \_\_bytes\_\_, \_\_fspath\_\_
Conversation to number: \_\_bool\_\_, \_\_complex\_\_, \_\_int\_\_, \_\_float\_\_, \_\_hash\_\_, \_\_index\_\_
Emulating collections: \_\_len\_\_, \_\_getitem\_\_, \_\_setitem\_\_, \_\_delitem\_\_, \_\_contains\_\_
Iteration: \_\_iter\_\_, \_\_aiter\_\_, \_\_next\_\_, \_\_anext\_\_, \_\_reversed\_\_
- List comprehensions and the basics of generator expressions
- Using tuples as records, versus using tuples as immutable lists
- Sequence unpacking and sequence patterns
- Reading from slices and writing to slices
- Specialized sequence types, like arrays and queues
Sequences:
- Container sequences: Can hold items of different types, including nested containers. Some examples: list, tuple, and collections.deque.
- Flat sequences: Hold items of one simple type: Some examples: str, bytes, and array.array.
A container sequence holds references to the objects it contains, while flat sequence stores the value of its contents in its own memory space, and not as distinct python objects.
特徴:
mutable vs immutable container vs flat
list: a mutable container
List comprehensions
Python Async Programming
Python's asynchronous programming is built around the asyncio module, and async/await keywords.
Concept
coroutine is a special type of function that represents a computation that can be paused and resumed.
A coroutine is defined with async def.
For example the following function is a coroutine
async def my_coroutine():
print("Coroutine started")
await asyncio.sleep(1) # This is a pause point
print("Coroutine resumed after 1 second")
return "Done!"
- Inside an async def function, the await keyword is used to pause the execution of the current coroutine.
- When a coroutine awaits something, it singals to the event loop that it's waiting for an I/O operation or some other asynchronous event to complete
- While the current coroutine is paused, the event loop can switch its attention to other coroutines or tasks that are ready to run, ensuring efficient use of the CPU.
Why async def functions can be paused
-
A regular def function is executed directly by the Python interpreter, when you call it the interpreter's program counter moves through its instructions sequentially. If it encounters something that blocks, the entire thread stops until that blocking operation is done.
-
An Async def function, when called doesn't immediately execute its body. Instead it returns a coroutine object. This object is a special kind of generator that the asyncio event loop knows how to manage.
-
use the await keyword to singal an intentional pause.
-
if there is no await inside an async def function, it will run like regular synchronous function until completion.
The Event loop is the orchestrator.
-
The asyncio event loop is continuously monitoring a set of registered coroutines/tasks. It's like a dispatcher.
-
State Preservation: (Generators)
Conceptually, Python coroutines are built on top of generators. When a generator yields a value, its local state (variables, instruction pointer) is saved. When next() is called on it again, it resumes from where it left off.
Similarly, when an async def function awaits, its internal state is saved. When the awaited operation completes, the coroutine is "sent" a signal to resume, and it continues execution from the line immediately following the await.
