SQLAlchemy를 사용했을 때, 트랜잭션을 설계하는 방법을 알아보자 ❕
이전 프로젝트에서 Flask를 사용했는데, Flask는 내장 ORM이 없는 프레임워크이다. 따라서 대부분 SQLAlchemy이라는 라이브러리를 사용하는데, 해당 라이브러리에서 트랜잭션 관리는 어떻게 이루어지는지, 나는 어떻게 트랜잭션을 설계했는지 정리해보려고 한다.
SQLAlchemy 개념 및 설정 방법
SQLAlchemy 개념 및 사용법에 대해 정리해 보자 ❕ Flask로 개발한 적이 있었는데, Flask는 ORM이 내장되어 있지 않아 ORM 라이브러리인 SQLAlchemy를 이용해야 했다. 처음 사용해 본 기술이라 다시 한번 정
chchaego.tistory.com
1. SQLAlchemy - session, scoped_session
세션이란 일반적으로 DB와 클라이언트 사이에서 통신을 시작하는 것부터 종료하기까지의 수명 기간을 의미하는데, SQLAlchemy 라이브러리에서 이 세션의 구현체가 바로 sqlalchemy.orm.Session
클래스이다. 이 Session을 이용해 쿼리 작업을 처리한다.
Session Basics — SQLAlchemy 2.0 Documentation
Session Basics What does the Session do ? In the most general sense, the Session establishes all conversations with the database and represents a “holding zone” for all the objects which you’ve loaded or associated with it during its lifespan. It pro
docs.sqlalchemy.org
세션 작업과 관련한 객체와 메서드들을 살펴보자
- session
- 세션은 최초 쿼리 작업을 요청하거나 관련된 객체를 건드리는 순간 새로운 트랜잭션을 생성하고, 해당 트랜잭션은 세션이 commit 되거나 rollback 되기 전까지 유지된다.
- sessionmaker()
- db 엔진과 연결한 후 session을 생성하는 역할
- scoped_session()
- 세션의 생명주기를 관리하고, 설정값에 맞추어 세션을 생성하는 역할
- 세션은 기본적으로 객체를 건드리는 순간 새로운 세션을 생성한다고 했는데,
scoped_session
을 사용한다면 특정 작업이 마칠 때까지 동일한 세션 객체를 반환할 수 있다. - 클라이언트를 기억할 수 있는 것은 생성된 세션 객체를 내부 저장소(registry)에 저장하고 요청한 클라이언트에 알맞은 객체를 반환하기 때문이다.
- session.close()
- 해당 세션에 배치되었던 모든 ORM 객체를 제거하고 트랜잭션 및 커넥션 자원을 반납한다.
- 커넥션이 풀로 반납되면 트랜잭션 상태도 다시 롤백 된다.
- 이 작업은 ’종료’보다는 ’리셋’에 더 가깝다. 롤백이나 커밋 작업을 수행하지 않았더라도 세션의 범위를 제한하기 위해 수행할 작업을 마친 후
close()
메소드 호출이 권장된다.
- scoped_session.remove()
- 세션을 끝내고 새로운 트랜잭션을 시작하고 싶을 때 호출한다.
- 세션 객체를 저장하는 내부 저장소(registry)에서 해당 세션 객체가 제거된다. 해당 메서드에는 `Session.close()` 작업이 포함된다.
remove()
된 후 세션 객체를 재호출하면 새로운 객체를 반환한다.
다음은 scoped_session을 사용하지 않고 세션 객체를 생성하는 예제이다.
engine = create_engine(url=db_url, pool_size=POOL_SIZE, max_overflow=MAX_OVERFLOW, pool_recycle=POOL_RECYCLE, pool_timeout=POOL_TIMEOUT)
db_session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db_session():
return db_session()
def search_all(self):
repository = TransactionTestRepository()
print(f"{get_db_session())} sleep ...")
models = repository.search_all()
time.sleep(5)
print(f"{get_db_session()} wake up ...")
return models
- session을 이용해 db관련 작업을 하는 메서드를 만들어 봤다.
- 간단하게 print로 출력해 볼 것이다.
위 코드에서, 사용자가 3번의 요청을 보낸다면 결과는 다음과 같다.
search_all()
안에서 sleep, wake up하는 세션 객체를 확인해보면 sleep하는 세션과, wake up하는 세션이 서로 다른 객체로 반환된다. 즉, 같은 스레드 안에서 get_db_session()
을 호출했을 때, 각각 다른 세션이 생성되고 있는 것이다.
(문제)
만약 같은 스레드 안에서 트랜잭션으로 묶을 A, B 작업에 대해 지금처럼 각 session 객체를 생성해 사용한다고 하자. 그럼 A가 실패하더라도 B는 정상적으로 실행되어버리고, B가 실패해도 A가 정상적으로 실행되어버린다.
⇒ ^어디서 세션(트랜잭션)을 생성해서 종료할건지, 세션의 생명 주기를 매 요청마다 관리해주는 작업이 필요해 보인다.^
(고민)
공식문서를 보면 보통 트랜잭션을 설계할 때, with문을 사용하라고 한다.
with get_db_session().begin() as session:
# 쿼리 작업
# ...
session.add(model)
with문 안에 작성한 모든 쿼리 작업은 하나의 세션 객체를 이용해 처리하고, with문이 종료될 때 자동으로 commit 해준다. (이 부분은 내가 의도한 동작이다)
하지만, 여러 작업을 하나의 트랜잭션으로 묶을 때 작업에 관련된 모든 코드를 with문 안에 넣어야 한다는 단점이 있다. 만약 코드의 간결함을 위해 메서드를 분리하고, 해당 작업과 관련한 메서드들을 호출해서 사용한다면 모든 메서드에 session을 인자로 전달해야 한다는 불편함도 있다.
따라서 나는 다른 방식으로 트랜잭션을 설계했는데, 먼저 내가 고려한 사항은 다음과 같다.
- 각 요청에서 동일한 세션 객체를 사용하는지
- pool 동작이 올바른지
- 트랜잭션 옵션을 설정할 수 있는지 (읽기용/쓰기용 분리, readOnly 같은 기능)
- 하나의 요청에서 동일한 세션 객체를 사용해 transaction commit 및 rollback이 가능한지 (원자성, 일관성)
위 네가지 고려사항을 만족하는지 확인하면서 코드를 점차 수정해 볼 것이다.
1 - 1. scoped_session
먼저, 세션의 생명주기를 설정하기 위해 찾아보니 scoped_session()이라는 메서드가 존재했다. 이는 요청한 클라이언트 및 작업에 따라 세션을 관리할 수 있게 한다.
1. '각 요청에서 동일한 세션 객체를 사용하는지' 확인해보자
scoped_session으로 세션 객체를 생성해주고, 이때 engine의 pool_size와 max_overflow의 값은 default값인 각 5, 10으로 설정했다.
engine = create_engine(url=db_url, pool_size=5, max_overflow=10)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
def get_db_session():
return db_session
class TransactionTestRepository():
def search_all(self):
query = select(MemberModel)
print(f"{get_db_session().__call__()} ready!")
models = get_db_session().execute(query).scalars().all()
print(f"{get_db_session().__call__()} excute!")
time.sleep(3)
print("wake up!")
return models
- scoped_session의 __call__ 메서드는 세션 객체를 반환하기 때문에, 현재 어떤 세션 객체를 사용하고 있는지 확인할 수 있다.
(참고)
애플리케이션 전역 설정으로, 요청이 끝날 때 마다 세션 객체를 remove할 수 있도록 미리 설정해두자!
def close_db(e=None):
db_session.remove()
def init_db(app):
app.teardown_appcontext(close_db)
`app.teardown_appcontext()`는 하나의 요청이 끝날 때마다 실행시킬 함수를 직접 지정할 수 있다.
이제, 위에서 작성했던 `search_all()`의 동작을 확인해보자 (JMeter 사용)
4개의 요청을 보낸다면, 다음과 같이 출력된다.
각 요청에 대해 ready, execute하는 세션 객체가 동일함을 확인할 수 있었다. 따라서 scoped_session을 사용할 경우 각 요청에 대해 고유한 세션이 생성되고, 각 요청에서 다른 세션 객체가 생성되는지 확인했다.
2. 'pool 동작이 올바른지' 확인해보자
engine의 pool_size와 max_overflow 값은 모두 1로 설정했다.
engine = create_engine(url=db_url, pool_size=1, max_overflow=1, pool_recycle=POOL_RECYCLE, pool_timeout=POOL_TIMEOUT)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
(풀링 설정 pool_size : 세션 수, max_overflow : pool_size를 초과하여 연결할 수 있는 세션 수)
그럼 4개의 요청을 보냈을 때, 최대 pool_size + max_overflow만큼(2개) 세션이 만들어지고, 2개의 세션이 이미 사용되고 있을 때 요청이 들어오면 해당 요청들은 대기하다가 기존 커넥션들이 종료되어야 쿼리를 실행할 수 있게 된다.
늦게 들어온 2개의 요청에 대해 세션(0x-0610, 0x-2210)은 대기(ready)하다가 이전 세션이 close됐을 때, 쿼리를 실행(execute)한다. pool 동작도 올바른지 확인했다.
(또한, 이를 통해 SQLAlchemy는 engine을 연결하는 순간에 커넥션을 생성하는 것이 아닌, 쿼리를 실제로 날리는 순간에 생성되는 것을 알 수 있었다)
2. 데코레이터
이제 트랜잭션과 관련된 처리를 검증해야 하는데, 그 전에 파이썬에서 제공하는 기능인 데코레이터에 대해 알아보자. 데코레이터는 특정 함수의 앞뒤에서 실행될 동작을 정의할 때 사용한다. `@` 뒤에 실행시킬 데코레이터를 작성하면 된다.
데코레이터를 이용한다면 공통된 로직을 간편하게 호출할 수 있고 하나의 함수는 핵심 기능에만 집중할 수 있다.
파이썬에서 정적 메서드를 정의할 때, 다음과 같이 `@staticmethod`를 붙이는데, 이 때 staticmethod가 데코레이터이다.
class Calc:
@staticmethod # 데코레이터
def add(a, b):
print(a + b)
예를 들어, 특정 함수의 실행 시간을 알고 싶을 때 데코레이터를 사용하면 유용하다.
import time
def timer_decorator(func):
def wrapper(self, *args, **kwargs):
start = time.time()
result = func(self, *args, **kwargs) # func은 test메서드를 의미
end = time.time()
print(f"{func.__name__} 실행시간 : {end - start:.4f}초")
return result
return wrapper
@timer_decorator
def test():
sum = 0
for i in range(1000):
sum += i
return sum
- `test()` 메서드가 호출되면 test가 호출되기 전에 decorator인 `timer_decorator()`가 시작된다.
- `timer_decorator()`은 start 시간을 설정하고, 실제 실행 함수 func인 `test()`를 실행한다.
- `test()` 메서드가 종료되면, end 시간을 설정하고 총 실행 시간을 출력한다.
3. 트랜잭션 설계
SQLAlchemy를 사용하면 세션 commit, close 등의 동작을 직접 실행시켜야 하는데 여기서 데코레이터를 이용하면 공통된 처리를 쉽게 실행시킬 수 있다.
먼저, 간단한 트랜잭션 동작을 데코레이터로 작성하면 다음과 같다.
def transactional(func):
def wrapper(self, *args, **kwargs):
try:
results = func(self, *args, **kwargs)
get_db_session().commit()
return results
except:
get_db_session().rollback()
raise
finally:
get_db_session().remove()
return wrapper
- 트랜잭션으로 묶을 메서드에 @transactinoal을 붙인다.
- func(실제 실행시킬 메서드)에서 사용한 세션 객체를 데코레이터에서 commit, rollback, close 해준다.
위와 같은 데코레이터를 사용하면 각 로직 마지막에 commit, rollback, remove를 작성할 필요가 없어지고, `@transactional` 안에서 호출되는 `get_db_session()`은 항상 같은 세션 객체를 반환한다.
이제 3. '트랜잭션 옵션을 설정할 수 있는지'를 만족시키기 위해 @transactional을 더 구체화 해보자
다음은 Spring에서 트랜잭션을 묶을 때 사용하는 `@Transactional` 어노테이션이다.
import org.springframework.transaction.annotation.Transactional;
@Transactional(readOnly = true)
어노테이션을 보면 readOnly
라는 옵션이 있는데, true로 설정했을 때 CUD 작업을 제한할 수 있었다.
이 기능을 방금 만든 `@transactional` 데코레이터에 적용하려고 한다.
위에서 작성한 transactional 데코레이터에 옵션을 설정하려면, 데코레이터 안의 함수를 두 번 감싸서 다음과 같이 사용하면 된다.
from utils import get_db_session
from flask import g
# 데코레이터에 옵션 설정
# options는 딕셔너리 형태로 값을 받는다
def transactional(**options):
def outer_wrapper(func):
def inner_wrapper(self, *args, **kwargs):
try:
is_read_only = options.get('read_only', False)
in_transaction = g.get('in_transaction', None)
# 트랜잭션에 타지 않았다면
if(in_transaction is None):
g.in_transaction = True
g.read_only = is_read_only
# 가장 상위의 트랜잭션에서 설정한 g객체의 read_only값을 참고해
# 읽기용 트랜잭션에서는 commit을 실행하지 않는다
if(g.get('read_only', False)):
results = func(self, *args, **kwargs)
else:
results = func(self, *args, **kwargs)
get_db_session().commit()
return results
except:
print(f"==== rollback function : {func.__name__} ====")
get_db_session().rollback()
raise
finally:
get_db_session().remove()
return inner_wrapper
return outer_wrapper
class TestRecordService():
# 데코레이터에 옵션 설정
@transactional(read_only=True)
def search_one(self, id):
# select 실행
...
@transactional()
def create_one(self):
# insert 실행
...
- 트랜잭션의 옵션
options
는 딕셔너리 형태로 값을 전달받는다. - 옵션인
read_only
값에 따라 읽기용 트랜잭션인지 쓰기용 트랜잭션인지 구분할 수 있으며, True인 경우 CUD 작업에 대해 commit 작업을 하지 않고 종료한다. - flask 전역 컨텍스트인 *g 객체를 사용해 read_only를 설정했다.
- 현재 호출하는 메서드가 이미 트랜잭션 내부에서 실행되고 있는지를 확인하기 위해 사용했다.
- ^처음 호출됐다면, 가장 상위의 `read_only` 속성에 따라 g 객체의 read_only값을 세팅하고, 처음 호출된게 아니라면 이미 세팅된 read_only 값을 사용해 commit 처리를 따르도록 했다. (Spring @Transactional의 전파수준 default인 REQUIRED를 따랐다)^
- if문에서 `g.get('read_only', False)` 값을 확인하는 이유
@transactional
데코레이터가 여러 번 호출되었을 때, 가장 상위 옵션을 따를 수 있다.
* flask 전역 컨텍스트 - g객체 ?
g객체는 `flask`에서 제공하는 전역 컨텍스트(Application Context)로, 요청 중에 유효한 데이터를 저장하는데 사용된다. g 객체를 사용하면 요청 스코프 내에서 데이터를 공유할 수 있게 해준다. 따라서 사용자의 요청이 동시에 들어오더라도 각각의 요청 내에서만 g객체가 유효하다.
g
객체는 각 요청마다 고유한 객체로, 한 요청에서 설정된 값은 다른 요청에 영향을 주지 않는다.- 요청이 완료되면
g
객체에 저장된 데이터는 삭제된다.
@transactional을 더 구체화 해서 4. ‘하나의 요청에서 동일한 세션 객체를 사용해 transaction commit 및 rollback이 가능한지’ 확인해보자
데이터베이스가 쓰기용 Primary DB / 읽기용 Replica DB가 분리되어 있다고 하자.
get_db_session()
함수에서, 위 transactional 데코레이터에서 설정한 g객체를 이용해 읽기용 세션 / 쓰기용 세션을 분리해 반환하면 된다.
primary_engine = create_engine(url=db_url, pool_size=pool_size, max_overflow=max_overflow, pool_recycle=pool_recycle, pool_timeout=pool_timeout)
replica_engine = create_engine(url=replica_db_url, pool_size=pool_size, max_overflow=max_overflow, pool_recycle=pool_recycle, pool_timeout=pool_timeout)
primary_db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
replica_db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=replica_engine))
# 가장 상위 트랜잭션에서 설정한 read_only값으로 그에 맞는 db_session을 반환한다
def get_db_session():
is_read_only = g.get('read_only', False)
return replica_db_session if is_read_only else primary_db_session
- 먼저 primary_engine, replica_engine을 분리하고 각 db_session을 설정한다.
그리고 항상 다음과 같이 remove시켜주자
def close_db(e=None):
primary_db_session.remove()
replica_db_session.remove()
def init_db(app):
app.teardown_appcontext(close_db)
다음과 같이 create_and_create()
에 @transactional
데코레이터를 붙여 실행하고, create2에서 예외를 발생시켰다.
@transactional()
def create_and_create(self):
# create 1
repository = TransactionTestRepository()
new_model = MemberModel()
new_model.name = "채고"
new_model.email = "chaego@email.com"
repository.save(new_model)
# create 2 - raise exception !
new_model2 = MemberModel()
new_model2.name = "채고22"
new_model2.email = "chaego22@email.com"
repository.save(new_model2)
raise Exception("raise exception!!")
이 경우, 두 모델 모두 저장되지 않고 rollback 됨을 확인할 수 있다. 아무런 쿼리가 실행되지 않았다.
만약 raise Exception
을 제거한다면, 다음과 같이 2개의 model이 insert된 후 commit 된다.
ISNERT가 두 번 발생함을 확인할 수 있다.
여기서 주의할 점은 트랜잭션으로 묶을 작업에 @transactional
데코레이터를 붙여야 한다.
예를 들어, 다음과 같이 `create_and_create()`함수에 `@transactional` 데코레이터를 붙이지 않고, `TransactionTestRepository().save()`에 `@transactional` 데코레이터를 붙인다면 두 번째 create 2 작업에서 예외가 발생해도 첫 번째 create 1 작업에 대해서는 rollback 되지 않는다.
def create_and_create(self):
# create 1
repository = TransactionTestRepository()
new_model = MemberModel()
new_model.name = "채고"
new_model.email = "chaego@email.com"
repository.save(new_model)
# create 2 - raise exception !
new_model2 = MemberModel()
new_model2.name = "채고22"
new_model2.email = "chaego22@email.com"
repository.save(new_model2)
raise Exception("raise exception!!")
class TransactionTestRepository():
@transactional()
def save(self, model):
get_db_session().add(model)
4. 마무리
SQLAlchemy 공식문서에서 쿼리 작업을 트랜잭션으로 묶어서 처리하려면 with문을 사용하라고 했다. 이 방법은 앞서 정리한 것 처럼, 여러 작업을 하나의 트랜잭션으로 묶을 때 작업에 관련된 모든 코드를 with문 안에 넣어야 한다는 단점이 있고, 코드의 간결함을 위해 메서드를 분리하고 해당 작업과 관련한 메서드들을 호출해서 사용한다면 모든 메서드에 session을 인자로 전달해야 한다는 불편함이 있었다.
따라서 공통된 처리를 묶을 수 있으면 좋을텐데,, 하다가 decorator를 이용해봤다. 앞뒤의 추가 작업을 로직에서 제거하고 간편하게 호출해 사용할 수 있다는 점에서 코드의 간결함 + 재사용성이 커졌다. 게다가 Spring에서 사용하는 것처럼 읽기용/쓰기용 작업을 분리할 수 있는 옵션도 설정할 수 있다. 지금은 default 값만 설정해놨는데, 시스템 규모가 늘어나고 또 다른 옵션이 생겨난다면 해당 `options` 인자를 활용해 확장할 수 있을 것이다.
Spring에서는 당연하게 사용했던 기능을 직접 설계하려니 생각보다 신경쓸게 많았는데, 이 방법이 정답이라고 할 수는 없지만 기존 방법과는 다르게 색다르게 풀어낸 것 같아 재밌었다. 크크
참고 😃
https://miintto.github.io/docs/python-sqlalchemy-session
https://docs.sqlalchemy.org/en/20/orm/session_basics.html#what-does-the-session-do
'Python' 카테고리의 다른 글
[Python] 파이썬 동시성 프로그래밍 (+ GIL, 코루틴) (2) | 2024.06.15 |
---|---|
[Python] SQLAlchemy 개념 및 설정 방법 (0) | 2024.06.05 |
[Python] CGI & WSGI & ASGI (0) | 2024.04.16 |
[Python] Django & Flask & FastAPI (0) | 2024.04.15 |