Source code for zenossapi.apiclient

# -*- coding: utf-8 -*-

"""
Zenoss API Client Class
"""

import inspect
import os
import urllib3

[docs] class ZenossAPIClientError(Exception): pass
[docs] class ZenossAPIClientAuthenticationError(Exception): pass
[docs] def strtobool(_string): """Converts a string to a boolean, replaces distutils.utils.strtobool (deprecated per PEP 632)""" if _string in ['y', 'yes', 't', 'true', 'on', '1', 1]: return True elif _string in ['n', 'no', 'f', 'false', 'off', '0', 0]: return False else: raise ZenossAPIClientError(f"Invalid boolean value: {_string}")
[docs] class Client(object): """Client class to access the Zenoss JSON API""" def __init__(self, host=None, user=None, password=None, ssl_verify=None, collection_zone=None, api_key=None): """ Create the client object to communicate with Zenoss. The authentication and ssl parameters can be pulled from the environment so that they don't have to be passed in from code or command line args. Arguments: host (str): FQDN used to access the Zenoss server user (str): Zenoss username password (str): Zenoss user's password ssl_verify (bool): Whether to verify the SSL certificate or not. Set to false when using servers with self-signed certs. collection_zone (str): Zenoss Cloud collection zone api_key (str): Zenoss Cloud api key """ if not host: if 'ZENOSS_HOST' in os.environ: host = os.environ['ZENOSS_HOST'] if not user: if 'ZENOSS_USER' in os.environ: user = os.environ['ZENOSS_USER'] if not password: if 'ZENOSS_PASSWD' in os.environ: password = os.environ['ZENOSS_PASSWD'] if not collection_zone: if 'ZENOSS_COLLECTION_ZONE' in os.environ: collection_zone = os.environ['ZENOSS_COLLECTION_ZONE'] if not api_key: if 'ZENOSS_API_KEY' in os.environ: api_key = os.environ['ZENOSS_API_KEY'] if ssl_verify is None: if 'ZENOSS_SSL_VERIFY' in os.environ: ssl_verify = os.environ['ZENOSS_SSL_VERIFY'] else: ssl_verify = True if isinstance(ssl_verify, str): ssl_verify = bool(strtobool(ssl_verify)) if collection_zone: host = '{0}/{1}'.format(host, collection_zone) self.api_url = '{0}/zport/dmd'.format(host) self.ssl_verify = ssl_verify self.api_headers = {"Content-Type": "application/json"} self.routers = dict() for router in self.get_routers(): self.routers[router] = __import__( 'zenossapi.routers.{0}'.format(router), fromlist=[router]) if user and password: self.api_headers.update(urllib3.make_headers( basic_auth='{0}:{1}'.format(user, password))) if api_key: self.api_headers['z-api-key'] = api_key
[docs] def get_routers(self): """ Gets the list of availble Zenoss API routers Returns: list: """ router_list = [] routers_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'routers') file_list = os.listdir(routers_path) file_list = [filename for filename in file_list if '.py' in filename] for fname in file_list: name, ext = fname.split('.') if name == "__init__": continue if ext == "py": router_list.append(name) return router_list
[docs] def get_router(self, router): """ Instantiates and returns a Zenoss router object Arguments: router (str): The API router to use """ router_class = getattr( self.routers[router], '__router__', '{0}Router'.format(router.capitalize()) ) api_router = getattr( self.routers[router], router_class, )( self.api_url, self.api_headers, self.ssl_verify, ) return api_router
[docs] def get_router_methods(self, router): """ List all available methods for an API router Arguments: router (str): The router to get methods from Returns: list: """ router_methods = [] router_class = getattr( self.routers[router], '__router__', '{0}Router'.format(router.capitalize()) ) for method in inspect.getmembers( getattr(self.routers[router], router_class), predicate=inspect.isroutine ): if method[0].startswith('__'): continue router_methods.append(method[0]) return router_methods