Merge "typing: Annotate openstack.config.openstackcloud"
This commit is contained in:
commit
82fd250eac
@ -22,6 +22,7 @@ import weakref
|
||||
|
||||
import dogpile.cache
|
||||
import keystoneauth1.exceptions
|
||||
from keystoneauth1.identity import base as ks_plugin_base
|
||||
import requests.models
|
||||
import requestsexceptions
|
||||
import typing_extensions as ty_ext
|
||||
@ -39,6 +40,8 @@ from openstack import utils
|
||||
from openstack import warnings as os_warnings
|
||||
|
||||
if ty.TYPE_CHECKING:
|
||||
from dogpile.cache import region as cache_region
|
||||
from keystoneauth1.access import service_catalog as ks_service_catalog
|
||||
from keystoneauth1 import session as ks_session
|
||||
from oslo_config import cfg
|
||||
|
||||
@ -89,7 +92,7 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
strict_proxies: bool = False,
|
||||
pool_executor: concurrent.futures.Executor | None = None,
|
||||
**kwargs: ty.Any,
|
||||
):
|
||||
) -> None:
|
||||
"""Create a connection to a cloud.
|
||||
|
||||
A connection needs information about how to connect, how to
|
||||
@ -270,13 +273,13 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
atexit.register(self.close)
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
def session(self) -> 'ks_session.Session':
|
||||
if not self._session:
|
||||
self._session = self.config.get_session()
|
||||
# Hide a reference to the connection on the session to help with
|
||||
# backwards compatibility for folks trying to just pass
|
||||
# conn.session to a Resource method's session argument.
|
||||
self.session._sdk_connection = weakref.proxy(self)
|
||||
setattr(self.session, '_sdk_connection', weakref.proxy(self))
|
||||
return self._session
|
||||
|
||||
@property
|
||||
@ -308,7 +311,7 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
def set_global_request_id(self, global_request_id: str) -> None:
|
||||
self._global_request_id = global_request_id
|
||||
|
||||
def global_request(self, global_request_id):
|
||||
def global_request(self, global_request_id: str) -> ty_ext.Self:
|
||||
"""Make a new Connection object with a global request id set.
|
||||
|
||||
Take the existing settings from the current Connection and construct a
|
||||
@ -358,14 +361,21 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
new_conn.set_global_request_id(global_request_id)
|
||||
return new_conn
|
||||
|
||||
def _make_cache(self, cache_class, expiration_time, arguments):
|
||||
def _make_cache(
|
||||
self,
|
||||
cache_class: str,
|
||||
expiration_time: int,
|
||||
arguments: dict[str, ty.Any] | None,
|
||||
) -> 'cache_region.CacheRegion':
|
||||
return dogpile.cache.make_region(
|
||||
function_key_generator=self._make_cache_key
|
||||
).configure(
|
||||
cache_class, expiration_time=expiration_time, arguments=arguments
|
||||
)
|
||||
|
||||
def _make_cache_key(self, namespace, fn):
|
||||
def _make_cache_key(
|
||||
self, namespace: str, fn: ty.Callable[..., ty.Any]
|
||||
) -> ty.Callable[..., str]:
|
||||
fname = fn.__name__
|
||||
if namespace is None:
|
||||
name_key = self.name
|
||||
@ -384,7 +394,7 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
|
||||
return generate_key
|
||||
|
||||
def pprint(self, resource):
|
||||
def pprint(self, resource: object) -> None:
|
||||
"""Wrapper around pprint that groks munch objects"""
|
||||
# import late since this is a utility function
|
||||
import pprint
|
||||
@ -392,7 +402,7 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
new_resource = _utils._dictify_resource(resource)
|
||||
pprint.pprint(new_resource)
|
||||
|
||||
def pformat(self, resource):
|
||||
def pformat(self, resource: object) -> str:
|
||||
"""Wrapper around pformat that groks munch objects"""
|
||||
# import late since this is a utility function
|
||||
import pprint
|
||||
@ -401,26 +411,50 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
return pprint.pformat(new_resource)
|
||||
|
||||
@property
|
||||
def _keystone_catalog(self):
|
||||
def _keystone_catalog(self) -> 'ks_service_catalog.ServiceCatalog':
|
||||
if self.session.auth is None:
|
||||
raise exceptions.ConfigException(
|
||||
'session has no auth information attached'
|
||||
)
|
||||
|
||||
if not isinstance(
|
||||
self.session.auth, ks_plugin_base.BaseIdentityPlugin
|
||||
):
|
||||
raise exceptions.ConfigException(
|
||||
'cannot fetch catalog for non-keystone auth plugin'
|
||||
)
|
||||
|
||||
return self.session.auth.get_access(self.session).service_catalog
|
||||
|
||||
@property
|
||||
def service_catalog(self):
|
||||
def service_catalog(self) -> list[dict[str, ty.Any]]:
|
||||
return self._keystone_catalog.catalog
|
||||
|
||||
@property
|
||||
def auth_token(self):
|
||||
def auth_token(self) -> str | None:
|
||||
# Keystone's session will reuse a token if it is still valid.
|
||||
# We don't need to track validity here, just get_token() each time.
|
||||
return self.session.get_token()
|
||||
|
||||
@property
|
||||
def current_user_id(self):
|
||||
def current_user_id(self) -> str | None:
|
||||
"""Get the id of the currently logged-in user from the token."""
|
||||
if self.session.auth is None:
|
||||
raise exceptions.ConfigException(
|
||||
'session has no auth information attached'
|
||||
)
|
||||
|
||||
if not isinstance(
|
||||
self.session.auth, ks_plugin_base.BaseIdentityPlugin
|
||||
):
|
||||
raise exceptions.ConfigException(
|
||||
'cannot fetch catalog for non-keystone auth plugin'
|
||||
)
|
||||
|
||||
return self.session.auth.get_access(self.session).user_id
|
||||
|
||||
@property
|
||||
def current_project_id(self):
|
||||
def current_project_id(self) -> str | None:
|
||||
"""Get the current project ID.
|
||||
|
||||
Returns the project_id of the current token scope. None means that
|
||||
@ -434,11 +468,11 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
return self.session.get_project_id()
|
||||
|
||||
@property
|
||||
def current_project(self):
|
||||
def current_project(self) -> utils.Munch:
|
||||
"""Return a ``utils.Munch`` describing the current project"""
|
||||
return self._get_project_info()
|
||||
|
||||
def _get_project_info(self, project_id=None):
|
||||
def _get_project_info(self, project_id: str | None = None) -> utils.Munch:
|
||||
project_info = utils.Munch(
|
||||
id=project_id,
|
||||
name=None,
|
||||
@ -464,11 +498,15 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
return project_info
|
||||
|
||||
@property
|
||||
def current_location(self):
|
||||
def current_location(self) -> utils.Munch:
|
||||
"""Return a ``utils.Munch`` explaining the current cloud location."""
|
||||
return self._get_current_location()
|
||||
|
||||
def _get_current_location(self, project_id=None, zone=None):
|
||||
def _get_current_location(
|
||||
self,
|
||||
project_id: str | None = None,
|
||||
zone: str | None = None,
|
||||
) -> utils.Munch:
|
||||
return utils.Munch(
|
||||
cloud=self.name,
|
||||
# TODO(efried): This is wrong, but it only seems to be used in a
|
||||
@ -538,14 +576,21 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
data = proxy._json_response(data)
|
||||
return meta.get_and_munchify(key, data)
|
||||
|
||||
def get_name(self):
|
||||
def get_name(self) -> str:
|
||||
return self.name
|
||||
|
||||
def get_session_endpoint(self, service_key, **kwargs):
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
def get_session_endpoint(
|
||||
self,
|
||||
service_key: str,
|
||||
min_version: str | None = None,
|
||||
max_version: str | None = None,
|
||||
) -> str | None:
|
||||
try:
|
||||
return self.config.get_session_endpoint(service_key, **kwargs)
|
||||
return self.config.get_session_endpoint(
|
||||
service_key,
|
||||
min_version=min_version,
|
||||
max_version=max_version,
|
||||
)
|
||||
except keystoneauth1.exceptions.catalog.EndpointNotFound as e:
|
||||
self.log.debug(
|
||||
"Endpoint not found in %s cloud: %s", self.name, str(e)
|
||||
@ -560,7 +605,9 @@ class _OpenStackCloudMixin(_services_mixin.ServicesMixin):
|
||||
)
|
||||
return endpoint
|
||||
|
||||
def has_service(self, service_key, version=None):
|
||||
def has_service(
|
||||
self, service_key: str, version: str | None = None
|
||||
) -> bool:
|
||||
if not self.config.has_service(service_key):
|
||||
# TODO(mordred) add a stamp here so that we only report this once
|
||||
if not (
|
||||
|
@ -33,12 +33,8 @@ module = [
|
||||
"openstack._log",
|
||||
"openstack.common",
|
||||
"openstack.common.*",
|
||||
"openstack.config._util",
|
||||
"openstack.config.defaults",
|
||||
"openstack.config.cloud_config",
|
||||
"openstack.config.cloud_region",
|
||||
"openstack.config.loader",
|
||||
"openstack.config.vendors",
|
||||
"openstack.config",
|
||||
"openstack.config.*",
|
||||
"openstack.connection",
|
||||
"openstack.exceptions",
|
||||
"openstack.fields",
|
||||
|
Loading…
x
Reference in New Issue
Block a user