Source code for jnpr.healthbot.modules.profiles

from jnpr.healthbot.swagger.models.ca_profile_schema import CaProfileSchema
from jnpr.healthbot.swagger.models.local_certificate_schema import LocalCertificateSchema
from jnpr.healthbot.swagger.models.ssh_key_profile_schema import SshKeyProfileSchema
from jnpr.healthbot.exception import SchemaError, NotFoundError

from jnpr.healthbot.modules import BaseModule

import logging
logger = logging.getLogger(__file__)


[docs]class Profile(object):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ self._hbot = hbot self.data_summarization = DataSummarization(hbot) self.security = Security(hbot)
[docs]class Security(object):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ self._hbot = hbot self.ca_profile = CaProfile(hbot) self.local_certificate = LocalCertificate(hbot) self.ssh_key_profile = SshKeyProfile(hbot)
[docs]class CaProfile(BaseModule):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ super().__init__(hbot)
[docs] def get(self, name: str = None, uncommitted: bool = True): """ Get `CaProfileSchema(s) <jnpr.healthbot.swagger.models.html#caprofileschema>`_ for given ca profile name or list for all :param name: ID of name :param bool uncommitted: True includes fetches uncommitted changes, False restricts data set to only committed changes Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: print(hb.settings.security.ca_profile.get('xyz') # for all print(hb.settings.security.ca_profile.get() :return: `CaProfileSchema(s) <jnpr.healthbot.swagger.models.html#caprofileschema>`_ """ if name is not None: ca_profile_url = self.hbot.urlfor.ca_profile(name) if uncommitted: ca_profile_url += self.hbot.apiopt_candidate response = self.api.get(ca_profile_url) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return self.hbot._create_schema( response, CaProfileSchema) else: ca_profiles_url = self.hbot.urlfor.ca_profiles() if uncommitted: ca_profiles_url += self.hbot.apiopt_candidate response = self.api.get(ca_profiles_url) response_json = response.json() if response.status_code != 200: logger.error(response.text) raise NotFoundError(response_json) existing_ca_profiles = response_json['ca-profile'] ca_profiles_list = [] for ca_profile in existing_ca_profiles: obj = self.hbot._create_schema(ca_profile, CaProfileSchema) ca_profiles_list.append(obj) return ca_profiles_list
[docs] def add(self, schema: CaProfileSchema = None, **kwargs): """ Add ca profile to HealthBot. The onus of uploading helper file certificate_authority_crt is on user. They should use hb.upload_helper_file API to make sure these crt file are uploaded in system. We don't do that validation as user can also upload these file after configuring profiles. :param object schema: `CaProfileSchema <jnpr.healthbot.swagger.models.html#caprofileschema>`_ :param object kwargs: key values, which can be used to create CaProfileSchema Check `CaProfileSchema <jnpr.healthbot.swagger.models.html#caprofileschema>`_ for details about which all keys can be used Example: :: from jnpr.healthbot import HealthBotClient from jnpr.healthbot import CaProfileSchema with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: ca_prof_schema = CaProfileSchema(certificate_authority_crt='abc.crt', name='hbez') hb.settings.security.ca_profile.add(ca_prof_schema) :returns: True when OK """ if schema is None: schema = CaProfileSchema(**kwargs) payload = self.hbot._create_payload(schema) ca_profile_url = self.hbot.urlfor.ca_profile(payload['name']) response = self.api.post(ca_profile_url, json=payload) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs] def delete(self, name: str): """ Remove ca profile from security settings :param str name: The name of the ca_profile to be deleted Example: :: hb.settings.security.ca_profile.delete('xyz') hb.commit() :returns: True when OK """ ca_profile_url = self.hbot.urlfor.ca_profile(name) response = self.api.delete(ca_profile_url) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs] def update(self, schema: CaProfileSchema = None, **kwargs): """ Update `CaProfileSchema <jnpr.healthbot.swagger.models.html#caprofileschema>`_ for given ca profile schema Passing Schema invoke `put` and kwargs `post` :param obj schema: `CaProfileSchema <jnpr.healthbot.swagger.models.html#caprofileschema>`_ :param object kwargs: key values, which can be used to create CaProfileSchema Check `CaProfileSchema <jnpr.healthbot.swagger.models.html#caprofileschema>`_ for details about which all keys can be used Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: schemaObj = hb.settings.security.ca_profile.get('xyz') schemaObj.certificate_authority_crt = 'pqr.crt' hb.settings.security.ca_profile.update(schemaObj) :returns: True when OK """ if schema is None: schema = CaProfileSchema(**kwargs) call = self.api.post else: if not isinstance(schema, CaProfileSchema): raise SchemaError(CaProfileSchema) call = self.api.put payload = self.hbot._create_payload(schema) name = schema.name ca_profile_url = self.hbot.urlfor.ca_profile(name) response = call(ca_profile_url, json=payload) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs]class LocalCertificate(BaseModule):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ super().__init__(hbot)
[docs] def get(self, name: str = None, uncommitted: bool = True): """ Get `LocalCertificateSchema(s) <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ for given local certificate name or list for all :param name: ID of name :param bool uncommitted: True includes fetches uncommitted changes, False restricts data set to only committed changes Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: print(hb.settings.security.local_certificate.get('xyz') # for all print(hb.settings.security.local_certificate.get() :return: `LocalCertificateSchema(s) <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ """ if name is not None: local_cert_url = self.hbot.urlfor.local_certificate(name) if uncommitted: local_cert_url += self.hbot.apiopt_candidate response = self.api.get(local_cert_url) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return self.hbot._create_schema( response, LocalCertificateSchema) else: local_certs_url = self.hbot.urlfor.local_certificates() if uncommitted: local_certs_url += self.hbot.apiopt_candidate response = self.api.get(local_certs_url) response_json = response.json() if response.status_code != 200: logger.error(response.text) raise NotFoundError(response_json) existing_local_certificates = response_json['local-certificate'] local_certificates_list = [] for local_certificate in existing_local_certificates: obj = self.hbot._create_schema(local_certificate, LocalCertificateSchema) local_certificates_list.append(obj) return local_certificates_list
[docs] def add(self, schema: LocalCertificateSchema = None, **kwargs): """ Add local certificate to security settings of HealthBot. The onus of uploading helper file (cert and key) is on user. They should use hb.upload_helper_file API to make sure these crt/key file are uploaded in system. We don't do that validation as user can also upload these file after configuring profiles. :param object schema: `LocalCertificateSchema <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ :param object kwargs: key values, which can be used to create LocalCertificateSchema Check `LocalCertificateSchema <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ for details about which all keys can be used Example: :: from jnpr.healthbot import HealthBotClient from jnpr.healthbot import LocalCertificateSchema with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: local_cert_schema = LocalCertificateSchema(client_crt='abc.crt', client_key='pqr.key', name='hbez') hb.settings.security.local_certificate.add(local_cert_schema) :returns: True when OK """ if schema is None: schema = LocalCertificateSchema(**kwargs) payload = self.hbot._create_payload(schema) local_certificate_url = self.hbot.urlfor.local_certificate(payload['name']) response = self.api.post(local_certificate_url, json=payload) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs] def delete(self, name: str): """ Remove local certificate from security settings :param str name: The name of the local_certificate to be deleted Example: :: hb.settings.security.local_certificate.delete('xyz') hb.commit() :returns: True when OK """ local_certificate_url = self.hbot.urlfor.local_certificate(name) response = self.api.delete(local_certificate_url) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs] def update(self, schema: LocalCertificateSchema = None, **kwargs): """ Update `LocalCertificateSchema <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ for given local certificate schema Passing Schema invoke `put` and kwargs `post` :param obj schema: `LocalCertificateSchema <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ :param object kwargs: key values, which can be used to create LocalCertificateSchema Check `LocalCertificateSchema <jnpr.healthbot.swagger.models.html#localcertificateschema>`_ for details about which all keys can be used Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: schemaObj = hb.settings.security.local_certificate.get('xyz') schemaObj.client_key = 'xyz.key' hb.settings.security.local_certificate.update(schemaObj) :returns: True when OK """ if schema is None: schema = LocalCertificateSchema(**kwargs) call = self.api.post else: if not isinstance(schema, LocalCertificateSchema): raise SchemaError(LocalCertificateSchema) call = self.api.put payload = self.hbot._create_payload(schema) name = schema.name local_certificate_url = self.hbot.urlfor.local_certificate(name) response = call(local_certificate_url, json=payload) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs]class SshKeyProfile(BaseModule):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ super().__init__(hbot)
[docs] def get(self, name: str = None, uncommitted: bool = True): """ Get `SshKeyProfileSchema(s) <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ for given ssh key profile name or list for all :param name: ID of name :param bool uncommitted: True includes fetches uncommitted changes, False restricts data set to only committed changes Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: print(hb.settings.security.ssh_key_profile.get('xyz') # for all print(hb.settings.security.ssh_key_profile.get() :return: `SshKeyProfileSchema(s) <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ """ if name is not None: ssh_key_profile_url = self.hbot.urlfor.ssh_key_profile(name) if uncommitted: ssh_key_profile_url += self.hbot.apiopt_candidate response = self.api.get(ssh_key_profile_url) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return self.hbot._create_schema( response, SshKeyProfileSchema) else: ssh_key_profiles_url = self.hbot.urlfor.ssh_key_profiles() if uncommitted: ssh_key_profiles_url += self.hbot.apiopt_candidate response = self.api.get(ssh_key_profiles_url) response_json = response.json() if response.status_code != 200: logger.error(response.text) raise NotFoundError(response_json) existing_ssh_key_profiles = response_json['ssh-key-profile'] ssh_key_profiles_list = [] for ssh_key_profile in existing_ssh_key_profiles: obj = self.hbot._create_schema(ssh_key_profile, SshKeyProfileSchema) ssh_key_profiles_list.append(obj) return ssh_key_profiles_list
[docs] def add(self, schema: SshKeyProfileSchema = None, **kwargs): """ Add ssh key profile to HealthBot. The onus of uploading helper file ssh_private_key_file is on user. They should use hb.upload_helper_file API to make sure these key file are uploaded in system. We don't do that validation as user can also upload these file after configuring profiles. :param object schema: `SshKeyProfileSchema <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ :param object kwargs: key values, which can be used to create SshKeyProfileSchema Check `SshKeyProfileSchema <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ for details about which all keys can be used Example: :: from jnpr.healthbot import HealthBotClient from jnpr.healthbot import SshKeyProfileSchema with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: ssh_key_prof_schema = SshKeyProfileSchema(name='hbez', ssh_private_key_file='abc.crt', ssh_private_key_passphrase='%$#@#') hb.settings.security.ssh_key_profile.add(ssh_key_prof_schema) :returns: True when OK """ if schema is None: schema = SshKeyProfileSchema(**kwargs) payload = self.hbot._create_payload(schema) ssh_key_profile_url = self.hbot.urlfor.ssh_key_profile(payload['name']) response = self.api.post(ssh_key_profile_url, json=payload) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs] def delete(self, name: str): """ Remove ssh key profile from security settings :param str name: The name of the ssh key profile to be deleted Example: :: hb.settings.security.ssh_key_profile.delete('xyz') hb.commit() :returns: True when OK """ ssh_key_profile_url = self.hbot.urlfor.ssh_key_profile(name) response = self.api.delete(ssh_key_profile_url) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs] def update(self, schema: SshKeyProfileSchema = None, **kwargs): """ Update `SshKeyProfileSchema <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ for given ssh key profile schema Passing Schema invoke `put` and kwargs `post` :param obj schema: `SshKeyProfileSchema <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ :param object kwargs: key values, which can be used to create SshKeyProfileSchema Check `SshKeyProfileSchema <jnpr.healthbot.swagger.models.html#sshkeyprofileschema>`_ for details about which all keys can be used Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: schemaObj = hb.settings.security.ssh_key_profile.get('xyz') schemaObj.certificate_authority_crt = 'pqr.crt' hb.settings.security.ssh_key_profile.update(schemaObj) :returns: True when OK """ if schema is None: schema = SshKeyProfileSchema(**kwargs) call = self.api.post else: if not isinstance(schema, SshKeyProfileSchema): raise SchemaError(SshKeyProfileSchema) call = self.api.put payload = self.hbot._create_payload(schema) name = schema.name ssh_key_profile_url = self.hbot.urlfor.ssh_key_profile(name) response = call(ssh_key_profile_url, json=payload) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return True
[docs]class DataSummarization(BaseModule):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ super().__init__(hbot) self.raw = Raw(hbot)
[docs]class Raw(BaseModule):
[docs] def __init__(self, hbot): """ :param object hbot: :class:`jnpr.healthbot.HealthBotClient` client instance """ super().__init__(hbot)
[docs] def get(self): pass
[docs] def add(self): pass
[docs] def delete(self, name: str): pass
[docs] def update(self): pass