Source code for jnpr.healthbot.healthbot

import requests
from requests.models import Response
from requests.exceptions import HTTPError
import logging

import re
import os
import time
import jwt
from jnpr.healthbot.exception import ConnectAuthError
from jnpr.healthbot.modules import devices
from jnpr.healthbot.modules import rules
from jnpr.healthbot.modules import playbooks
from jnpr.healthbot.modules import database
from jnpr.healthbot.modules import settings
from jnpr.healthbot.modules import profiles
from jnpr.healthbot.modules import graph
from jnpr.healthbot.modules import BaseModule

from jnpr.healthbot.swagger.api.authentication_api import AuthenticationApi
from jnpr.healthbot.swagger.api_client import ApiClient
from jnpr.healthbot.swagger.configuration import Configuration
from jnpr.healthbot.swagger.models.health_schema import HealthSchema
from jnpr.healthbot.swagger.models.refresh_token import RefreshToken
from jnpr.healthbot.swagger.models.token import Token

from jnpr.healthbot.swagger.api.configuration_api import ConfigurationApi
from jnpr.healthbot.swagger.api.default_api import DefaultApi
from jnpr.healthbot.swagger.api.authentication_api import AuthenticationApi
from jnpr.healthbot.swagger.api.data_source_api import DataSourceApi
from jnpr.healthbot.swagger.api.data_store_api import DataStoreApi
from jnpr.healthbot.swagger.api.debug_api import DebugApi
from jnpr.healthbot.swagger.api.instance_schedule_state_api import InstanceScheduleStateApi
from jnpr.healthbot.swagger.api.license_api import LicenseApi
from jnpr.healthbot.swagger.api.services_api import ServicesApi
from jnpr.healthbot.swagger.api.system_api import SystemApi

from jnpr.healthbot.swagger.rest import ApiException
from jnpr.healthbot.urlfor import UrlFor

from pathlib import Path

import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


logger = logging.getLogger(__file__)


[docs]class HealthBotClient(object): apiopt_candidate = "/?working=true"
[docs] def __init__( self, server: str, user: str, password: str, *args, **kwargs): """ An instance of this class represents the HealthBot Service :param str server: HealthBot Server IP Address :param str user: HealthBot Server (not the Linux user) UserName :param str password: HealthBot Server (not the Linux user) password :param int port: *OPTIONAL* HealthBot Server port (defaults to 8080) Example: :: from jnpr.healthbot import HealthBotClient from pprint import pprint with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx', port=8000) as hb: # Get list of all existing devices print(hb.device.get_ids()) # Get config details of a given device id pprint(hb.device.get('core-01')) # Get config details of all the device pprint(hb.device.get()) # Get device facts of a given device id pprint(hb.device.get_facts('avro')) # Get device facts for all the devices in HB pprint(hb.device.get_facts()) # Add a device from jnpr.healthbot import DeviceSchema ds = DeviceSchema(device_id='xyz', host='xx.xxx.xxx.xxx', authentication={"password": {"password": "xxxxx", "username": "xxxxx"}}) # we can also later assign values like this ds.description = "HbEZ testing" # This will add device in candidate DB hb.device.add(schema=ds) # Add device group print(hb.device_group.add(device_group_name="edge", description="All devices on the edge", devices=['demo'])) # commit changes to master DB hb.commit() # get details of a given topic/rule pprint(hb.rule.get('linecard.ospf', 'check-ddos-statistics')) """ self.server = server self.user = user self.password = password self._version = "" self.port = kwargs.get('port', 8080) if server is None or server == "": raise ValueError("You must provide 'server' of HealthBot") if user is None or user == "": raise ValueError("You must provide 'user' of HealthBot") if password is None or password == "": raise ValueError("You must provide 'password' of HealthBot") self._hbot_session = None self._user_token = None self.api_client = None self._auth = None self.connected = False self._token_expire_time = None
[docs] def open(self): """ Open session with HealthBot server. First sets user token (for healthbot 3.0.0 and above) and check a top level URL for confirmation of API to be working. """ # set_user_token returning False can be the case for older Healthbot if not self.set_user_token(): try: # Call one dummy URL to check if login was successful # needed for <3.0 HealthBots req = self.hbot_session.get(self.url + '/device') # HTTP errors are not raised by default, this statement does that req.raise_for_status() self.connected = True except HTTPError as ex: logger.error("Check user credentials") raise ConnectAuthError(self, "Check user credentials") except Exception as ex: logger.error("Error: {}".format(ex)) raise ex else: logger.debug( "Connected to HealthBot Server({}) on port {}".format( self.server, self.port)) self._version = self.version self.urlfor = UrlFor(self) self.device = devices.Device(self) self.device_group = devices.DeviceGroup(self) self.network_group = devices.NetworkGroup(self) self.rule = rules.Rule(self) self.topic = rules.Topic(self) self.playbook = playbooks.Playbook(self) self.settings = settings.Settings(self) self.profile = profiles.Profile(self) self.charts = graph.HBCharts(self) config_bm = BaseModule(self, self.config_url) self.authorization = config_bm.authorization self.configuration = ConfigurationApi(config_bm.api_client) self.data_store = DataStoreApi(config_bm.api_client) self.services = ServicesApi(config_bm.api_client) self.default = DefaultApi(config_bm.api_client) self.instance_schedule = InstanceScheduleStateApi(config_bm.api_client) non_config_bm = BaseModule(self, self.url) self.license = LicenseApi(non_config_bm.api_client) self.data_source = DataSourceApi(non_config_bm.api_client) self.system = SystemApi(non_config_bm.api_client) self.debug = DebugApi(non_config_bm.api_client) return self
login = open
[docs] def set_user_token(self): """ From HealthBot 3.0.0 APIs will be token based. This function helps in setting user based token. This token will be used in header of any REST API calls. """ conf = Configuration() conf.host = self.url conf.verify_ssl = False self.api_client = ApiClient(configuration=conf) self._auth = AuthenticationApi(api_client=self.api_client) try: self._user_token = self._auth.user_login( credential={"userName": self.user, "password": self.password}) self._token_expire_time = time.time() + \ int(self._user_token.token_expires) self.hbot_session.headers.update({ 'x-iam-token': self._user_token.access_token}) self.connected = True except ApiException as ex: logger.debug("Check if given HealthBot version support authorization key") # set user/password used by older healthbot version APIs self.hbot_session.auth = (self.user, self.password) return False except Exception as ex: logger.error("User Login Error: {}".format(ex)) raise ex return True
@property def tenant(self): raw_data = jwt.decode(self._user_token.access_token, options={"verify_aud": False, "verify_signature": False, "verify_nbf": False}) return raw_data['scope']['Name'] @property def hbot_session(self): """ Property provides requests module session object. Also help in updating Access token key when expires. Any call to hbot_session.apis should go through this property to keep a check on token key expiry. """ if self._hbot_session is None: self._hbot_session = requests.Session() self._hbot_session.headers.update( {"Accept": "application/json", "Content-Type": "application/json"} ) self._hbot_session.verify = False # this call should make sure to check for token expiry self.user_token return self._hbot_session @property def user_token(self): if self._token_expire_time and time.time() >= self._token_expire_time: logger.debug("Token expired, hence refreshing") obj = self._auth.refresh_token(token=Token( refresh_token=self._user_token.refresh_token)) self._user_token.access_token = obj.access_token self._user_token.refresh_token = obj.refresh_token self._hbot_session.headers.update({ 'x-iam-token': self._user_token.access_token}) self._token_expire_time = time.time() + \ int(self._user_token.token_expires) return self._user_token
[docs] def logout(self): """ Call user logout function to discard access tokens. """ if self._user_token: refresh_token = RefreshToken(token=self._user_token.refresh_token) self._auth.user_logout(refresh_token=refresh_token)
@property def tsdb(self): """ Connection to the tsdb :return: InfluxDBClient """ return database.Database(self) @tsdb.setter def tsdb(self, value): """ read-only property """ raise RuntimeError("tsdb is read-only!") @property def config_url(self): """ This function will give config URL """ return self.url + "/config" @property def url(self): """ Initials of URL to be used for API call. :returns: str: Initials of URL to be used for API call. """ url_initials = "https://" + self.server + ":" + str(self.port) + "/api/" return url_initials + "v2" @property def grafana_url(self): return "https://" + self.server + ":" + str(self.port) + "/grafana/api" @property def version(self): """ TO get the version of Healthbot Server :returns: str: API server version. """ if self._version == "": system_details_url = "{api}/system-details".format(api=self.url) resp = self.api.get(system_details_url) if resp.status_code == 200: hb_version = resp.json().get('version', '') obj = re.search(r'HealthBot (\d\.\d\.\d)', hb_version) if obj: self._version = obj.group(1) # to refresh user token to use v2, as we started with v1 # this should be cleaned/deleted in future. self.api_client.configuration.host = self.url return self._version @property def api(self): return self.hbot_session
[docs] def commit(self): """ Commit any candidate configuration Example: :: from jnpr.healthbot import HealthBotClient from jnpr.healthbot import DeviceSchema with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: ds = DeviceSchema(device_id='xyz', host='xx.xxx.xxx.xxx', authentication={"password": {"password": "xxxxx", "username": "xxxxx"}}) # we can also later assign values like this ds.description = "HbEZ testing" # This will add device in candidate DB hb.device.add(schema=ds) # commit changes to master DB hb.commit() :raises: Any requests exception :returns: True when OK """ response = self.hbot_session.post( "{api}/configuration".format(api=self.config_url)) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return response.ok
[docs] def rollback(self): """ Rollback any candidate configuration Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: # This will delete device in candidate DB hb.device.delete('xyz') # rollback candidate configuration hb.rollback() :raises: Any requests exception :returns: True when OK """ response = self.hbot_session.delete( "{api}/configuration".format(api=self.url)) if response.status_code != 200: logger.error(response.text) response.raise_for_status() return response.ok
[docs] def upload_helper_file(self, filename): """ Upload a "helper-file" to the server. A helper-file, cab be YAML/.py/.rule/.playbook file. :param str filename: The name of the file to be uploaded. """ # when using the 'files' option to POST, we must remove the content-type from the # headers since the requests library uses/fills this in according to # the file c_type = self.hbot_session.headers.pop('Content-Type') try: file_p = Path(os.path.abspath(filename)) # upload rule/playbook file if filename.endswith('.rule'): url = self.urlfor.topics() files = {'topics': file_p.open('rb')} elif filename.endswith('.playbook'): url = self.urlfor.playbooks() files = {'playbooks': file_p.open('rb')} else: url = self.urlfor.helper_files(name=file_p.name) files = {'up_file': file_p.open('rb')} response = self.hbot_session.post(url=url, files=files) if response.status_code != 200: logger.error(response.text) response.raise_for_status() finally: self.hbot_session.headers['Content-Type'] = c_type return True
def _create_schema(self, response: requests.models.Response, schema): attribute_map = schema.attribute_map if not isinstance(response, dict): attributes = dict() if isinstance(response, Response): response = response.json() attribute_map_values = attribute_map.values() attribute_map_reversed = {v: k for k, v in attribute_map.items()} for key, value in response.items(): if key in attribute_map_values: attributes[attribute_map_reversed[key]] = response.get(key) else: response = response.to_dict() for key, value in response.items(): if key in attribute_map: attributes[key] = response.get(key) return schema(**attributes) else: attributes = dict() attribute_map_values = attribute_map.values() attribute_map_reversed = {v: k for k, v in attribute_map.items()} for key, value in response.items(): if key in attribute_map_values: attributes[attribute_map_reversed[key]] = response.get(key) return schema(**attributes) def _create_payload(self, schemaObj): """ remove keys which got None as value :param payload: dict which needs clean up of None values :return: dict of payload without None values """ try: payload = schemaObj.to_dict() except AttributeError: if isinstance(schemaObj, dict): return schemaObj else: raise # remove None values attribute_map = schemaObj.attribute_map attributes = dict() for key, value in payload.items(): if value is not None: if key in attribute_map: if isinstance(value, dict): childSchema = getattr(schemaObj, key) if isinstance(childSchema, dict): # childSchema is not any schema, hence set it to # payload attributes[attribute_map[key]] = payload.get(key) else: attributes[attribute_map[key] ] = self._create_payload(childSchema) elif isinstance(value, list) and filter( lambda i: isinstance(i, dict), value): childSchemas = getattr(schemaObj, key) attributes[attribute_map[key]] = [] for childSchema in childSchemas: if isinstance(childSchema, str): child_val = childSchema else: child_val = self._create_payload(childSchema) attributes[attribute_map[key]].append(child_val) else: attributes[attribute_map[key]] = payload.get(key) return attributes
[docs] def health(self): """ Returns health of network-groups and devices in device-groups `HealthSchema <jnpr.healthbot.swagger.models.html#healthschema>`_ Example: :: from jnpr.healthbot import HealthBotClient with HealthBotClient('xx.xxx.x.xx', 'xxxx', 'xxxx') as hb: print(hb.health()) :return: `HealthSchema <jnpr.healthbot.swagger.models.html#healthschema>`_ """ health_url = self.urlfor.health() resp = self.api.get(health_url) if resp.status_code == 404: return {} return self._create_schema(resp, HealthSchema)
[docs] def close(self): self.hbot_session.close() self.connected = False
def __repr__(self): return "HealthBot(%s)" % self.server # ----------------------------------------------------------------------- # Context Manager # ----------------------------------------------------------------------- def __enter__(self): return self.login() def __exit__(self, exc_type, exc_val, exc_tb): self.logout() self.connected = False def __repr__(self): return "HealthBot(%s)" % self.server