Rohit Malhotra c034cc5dfb
Refactor: move helper function to avoid circular imports (#11310)
Co-authored-by: openhands <openhands@all-hands.dev>
2025-10-10 12:40:03 -04:00

191 lines
6.5 KiB
Python

import hashlib
import hmac
import os
import time
import typing
import jwt
import requests # type: ignore
from fastapi import HTTPException
from server.auth.constants import (
BITBUCKET_APP_CLIENT_ID,
ENABLE_ENTERPRISE_SSO,
ENABLE_JIRA,
ENABLE_JIRA_DC,
ENABLE_LINEAR,
GITHUB_APP_CLIENT_ID,
GITHUB_APP_PRIVATE_KEY,
GITHUB_APP_WEBHOOK_SECRET,
GITLAB_APP_CLIENT_ID,
)
from openhands.core.config.utils import load_openhands_config
from openhands.integrations.service_types import ProviderType
from openhands.server.config.server_config import ServerConfig
from openhands.server.types import AppMode
# Create a function to get config to avoid circular imports
_config = None
def get_config():
global _config
if _config is None:
_config = load_openhands_config()
return _config
def sign_token(payload: dict[str, object], jwt_secret: str, algorithm='HS256') -> str:
"""Signs a JWT token."""
return jwt.encode(payload, jwt_secret, algorithm=algorithm)
def verify_signature(payload: bytes, signature: str):
if not signature:
raise HTTPException(
status_code=403, detail='x-hub-signature-256 header is missing!'
)
expected_signature = (
'sha256='
+ hmac.new(
GITHUB_APP_WEBHOOK_SECRET.encode('utf-8'),
msg=payload,
digestmod=hashlib.sha256,
).hexdigest()
)
if not hmac.compare_digest(expected_signature, signature):
raise HTTPException(status_code=403, detail="Request signatures didn't match!")
class SaaSServerConfig(ServerConfig):
config_cls: str = os.environ.get('OPENHANDS_CONFIG_CLS', '')
app_mode: AppMode = AppMode.SAAS
posthog_client_key: str = os.environ.get('POSTHOG_CLIENT_KEY', '')
github_client_id: str = os.environ.get('GITHUB_APP_CLIENT_ID', '')
enable_billing = os.environ.get('ENABLE_BILLING', 'false') == 'true'
hide_llm_settings = os.environ.get('HIDE_LLM_SETTINGS', 'false') == 'true'
auth_url: str | None = os.environ.get('AUTH_URL')
settings_store_class: str = 'storage.saas_settings_store.SaasSettingsStore'
secret_store_class: str = 'storage.saas_secrets_store.SaasSecretsStore'
conversation_store_class: str = (
'storage.saas_conversation_store.SaasConversationStore'
)
conversation_manager_class: str = os.environ.get(
'CONVERSATION_MANAGER_CLASS',
'server.clustered_conversation_manager.ClusteredConversationManager',
)
monitoring_listener_class: str = (
'server.saas_monitoring_listener.SaaSMonitoringListener'
)
user_auth_class: str = 'server.auth.saas_user_auth.SaasUserAuth'
# Maintenance window configuration
maintenance_start_time: str = os.environ.get(
'MAINTENANCE_START_TIME', ''
) # Timestamp in EST e.g 2025-07-29T14:18:01.219616-04:00
enable_jira = ENABLE_JIRA
enable_jira_dc = ENABLE_JIRA_DC
enable_linear = ENABLE_LINEAR
app_slug: None | str = None
def __init__(self) -> None:
self._get_app_slug()
def _get_app_slug(self):
"""Retrieves the GitHub App slug using the GitHub API's /app endpoint by generating a JWT for the app
Raises:
HTTPException: If the request to the GitHub API fails.
"""
if not GITHUB_APP_CLIENT_ID or not GITHUB_APP_PRIVATE_KEY:
return
# Generate a JWT for the GitHub App
now = int(time.time())
payload = {
'iat': now - 60, # Issued at time (backdate 60 seconds for clock skew)
'exp': now
+ (
9 * 60
), # Expiration time (set to 9 minutes as 10 was causing error if there is time drift)
'iss': GITHUB_APP_CLIENT_ID, # GitHub App ID
}
encoded_jwt = sign_token(payload, GITHUB_APP_PRIVATE_KEY, algorithm='RS256') # type: ignore
# Define the headers for the GitHub API request
headers = {
'Authorization': f'Bearer {encoded_jwt}',
'Accept': 'application/vnd.github+json',
}
# Make a request to the GitHub API /app endpoint
response = requests.get('https://api.github.com/app', headers=headers)
# Check if the response is successful
if response.status_code != 200:
raise ValueError(
f'Failed to retrieve app info, status code:{response.status_code}, message:{response.content.decode('utf-8')}'
)
# Extract the app slug from the response
app_data = response.json()
self.app_slug = app_data.get('slug')
if not self.app_slug:
raise ValueError("GitHub app slug is missing in the API response.'")
def verify_config(self):
if not self.config_cls:
raise ValueError('Config path not provided!')
if not self.posthog_client_key:
raise ValueError('Missing posthog client key in env')
if GITHUB_APP_CLIENT_ID and not self.github_client_id:
raise ValueError('Missing Github client id')
def get_config(self):
# These providers are configurable via helm charts for self hosted deployments
# The FE should have this info so that the login buttons reflect the supported IDPs
providers_configured = []
if GITHUB_APP_CLIENT_ID:
providers_configured.append(ProviderType.GITHUB)
if GITLAB_APP_CLIENT_ID:
providers_configured.append(ProviderType.GITLAB)
if BITBUCKET_APP_CLIENT_ID:
providers_configured.append(ProviderType.BITBUCKET)
if ENABLE_ENTERPRISE_SSO:
providers_configured.append(ProviderType.ENTERPRISE_SSO)
config: dict[str, typing.Any] = {
'APP_MODE': self.app_mode,
'APP_SLUG': self.app_slug,
'GITHUB_CLIENT_ID': self.github_client_id,
'POSTHOG_CLIENT_KEY': self.posthog_client_key,
'FEATURE_FLAGS': {
'ENABLE_BILLING': self.enable_billing,
'HIDE_LLM_SETTINGS': self.hide_llm_settings,
'ENABLE_JIRA': self.enable_jira,
'ENABLE_JIRA_DC': self.enable_jira_dc,
'ENABLE_LINEAR': self.enable_linear,
},
'PROVIDERS_CONFIGURED': providers_configured,
}
# Add maintenance window if configured
if self.maintenance_start_time:
config['MAINTENANCE'] = {
'startTime': self.maintenance_start_time,
}
if self.auth_url:
config['AUTH_URL'] = self.auth_url
return config