Reusing the GCP Connector

This commit is contained in:
Tim O'Farrell 2025-12-23 10:17:49 -07:00
parent fae83230ee
commit 44f26ad061
2 changed files with 33 additions and 21 deletions

View File

@ -20,16 +20,21 @@ GCP_REGION = os.environ.get('GCP_REGION')
POOL_SIZE = int(os.environ.get('DB_POOL_SIZE', '25'))
MAX_OVERFLOW = int(os.environ.get('DB_MAX_OVERFLOW', '10'))
# Initialize Cloud SQL Connector once at module level for GCP environments.
_connector = None
def _get_db_engine():
if GCP_DB_INSTANCE: # GCP environments
def get_db_connection():
global _connector
from google.cloud.sql.connector import Connector
connector = Connector()
if not _connector:
_connector = Connector()
instance_string = f'{GCP_PROJECT}:{GCP_REGION}:{GCP_DB_INSTANCE}'
return connector.connect(
return _connector.connect(
instance_string, 'pg8000', user=DB_USER, password=DB_PASS, db=DB_NAME
)

View File

@ -4,7 +4,7 @@ import asyncio
import logging
import os
from pathlib import Path
from typing import AsyncGenerator
from typing import Any, AsyncGenerator
from fastapi import Request
from pydantic import BaseModel, PrivateAttr, SecretStr, model_validator
@ -42,6 +42,7 @@ class DbSessionInjector(BaseModel, Injector[async_sessionmaker]):
_async_engine: AsyncEngine | None = PrivateAttr(default=None)
_session_maker: sessionmaker | None = PrivateAttr(default=None)
_async_session_maker: async_sessionmaker | None = PrivateAttr(default=None)
_gcp_connector: Any = PrivateAttr(default=None)
@model_validator(mode='after')
def fill_empty_fields(self):
@ -65,14 +66,17 @@ class DbSessionInjector(BaseModel, Injector[async_sessionmaker]):
return self
def _create_gcp_db_connection(self):
# Lazy import because lib does not import if user does not have posgres installed
from google.cloud.sql.connector import Connector
gcp_connector = self._gcp_connector
if gcp_connector is None:
# Lazy import because lib does not import if user does not have posgres installed
from google.cloud.sql.connector import Connector
gcp_connector = Connector()
self._gcp_connector = gcp_connector
connector = Connector()
instance_string = f'{self.gcp_project}:{self.gcp_region}:{self.gcp_db_instance}'
password = self.password
assert password is not None
return connector.connect(
return gcp_connector.connect(
instance_string,
'pg8000',
user=self.user,
@ -81,21 +85,24 @@ class DbSessionInjector(BaseModel, Injector[async_sessionmaker]):
)
async def _create_async_gcp_db_connection(self):
# Lazy import because lib does not import if user does not have posgres installed
from google.cloud.sql.connector import Connector
gcp_connector = self._gcp_connector
if gcp_connector is None:
# Lazy import because lib does not import if user does not have posgres installed
from google.cloud.sql.connector import Connector
loop = asyncio.get_running_loop()
gcp_connector = Connector(loop=loop)
self._gcp_connector = gcp_connector
loop = asyncio.get_running_loop()
async with Connector(loop=loop) as connector:
password = self.password
assert password is not None
conn = await connector.connect_async(
f'{self.gcp_project}:{self.gcp_region}:{self.gcp_db_instance}',
'asyncpg',
user=self.user,
password=password.get_secret_value(),
db=self.name,
)
return conn
password = self.password
assert password is not None
conn = await gcp_connector.connect_async(
f'{self.gcp_project}:{self.gcp_region}:{self.gcp_db_instance}',
'asyncpg',
user=self.user,
password=password.get_secret_value(),
db=self.name,
)
return conn
def _create_gcp_engine(self):
engine = create_engine(