import ast
import logging
import json
import functools
import re
import traceback
import requests
from datetime import datetime
from urllib.parse import urljoin, urlsplit, urlunsplit, SplitResult
from opentracing_utils import trace, extract_span_from_kwargs
from zmon_cli import __version__
from zmon_cli.config import DEFAULT_TIMEOUT
API_VERSION = 'v1'
ZMON_USER_AGENT = 'zmon-client/{}'.format(__version__)
ACTIVE_ALERT_DEF = 'checks/all-active-alert-definitions'
ACTIVE_CHECK_DEF = 'checks/all-active-check-definitions'
ALERT_DATA = 'status/alert'
ALERT_DEF = 'alert-definitions'
CHECK_DEF = 'check-definitions'
DASHBOARD = 'dashboard'
DOWNTIME = 'downtimes'
ENTITIES = 'entities'
GRAFANA = 'visualization/dashboards'
GROUPS = 'groups'
MEMBER = 'member'
PHONE = 'phone'
SEARCH = 'quick-search'
STATUS = 'status'
TOKENS = 'onetime-tokens'
ALERT_DETAILS_VIEW_URL = '#/alert-details/'
CHECK_DEF_VIEW_URL = '#/check-definitions/view/'
DASHBOARD_VIEW_URL = '#/dashboards/views/'
GRAFANA_DASHBOARD_URL = 'visualization/dashboard/'
TOKEN_LOGIN_URL = 'tv/'
logger = logging.getLogger(__name__)
parentheses_re = re.compile('[(]+|[)]+')
invalid_entity_id_re = re.compile('[^a-zA-Z0-9-@_.\\[\\]\\:]+')
class JSONDateEncoder(json.JSONEncoder):
def default(self, obj):
return obj.isoformat() if isinstance(obj, datetime) else super().default(obj)
[docs]class ZmonError(Exception):
"""ZMON client error."""
def __init__(self, message=''):
super().__init__('ZMON client error: {}'.format(message))
[docs]class ZmonArgumentError(ZmonError):
"""A ZMON client error indicating that a supplied object has missing or invalid attributes."""
pass
def logged(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
logger.error('ZMON client failed in: {}'.format(f.__name__))
raise
return wrapper
def compare_entities(e1, e2):
try:
e1_copy = e1.copy()
e1_copy.pop('last_modified', None)
e2_copy = e2.copy()
e2_copy.pop('last_modified', None)
return (json.loads(json.dumps(e1_copy, cls=JSONDateEncoder)) ==
json.loads(json.dumps(e2_copy, cls=JSONDateEncoder)))
except Exception:
# We failed during json serialiazation/deserialization, fallback to *not-equal*!
logger.exception('Failed in `compare_entities`')
return False
def get_valid_entity_id(e):
return invalid_entity_id_re.sub('-', parentheses_re.sub(lambda m: '[' if '(' in m.group() else ']', e.lower()))
[docs]class Zmon:
"""ZMON client class that enables communication with ZMON backend.
:param url: ZMON backend base url.
:type url: str
:param token: ZMON authentication token.
:type token: str
:param username: ZMON authentication username. Ignored if ``token`` is used.
:type username: str
:param password: ZMON authentication password. Ignored if ``token`` is used.
:type password: str
:param timeout: HTTP requests timeout. Default is 10 sec.
:type timeout: int
:param verify: Verify SSL connection. Default is ``True``.
:type verify: bool
:param user_agent: ZMON user agent. Default is generated by ZMON client and includes lib version.
:type user_agent: str
"""
def __init__(
self, url, token=None, username=None, password=None, timeout=DEFAULT_TIMEOUT, verify=True,
user_agent=ZMON_USER_AGENT):
"""Initialize ZMON client."""
self.timeout = timeout
split = urlsplit(url)
self.base_url = urlunsplit(SplitResult(split.scheme, split.netloc, '', '', ''))
self.url = urljoin(self.base_url, self._join_path(['api', API_VERSION, '']))
self._session = requests.Session()
self._timeout = timeout
self.user_agent = user_agent
if username and password and token is None:
self._session.auth = (username, password)
self._session.headers.update({'User-Agent': user_agent, 'Content-Type': 'application/json'})
if token:
self._session.headers.update({'Authorization': 'Bearer {}'.format(token)})
if not verify:
logger.warning('ZMON client will skip SSL verification!')
requests.packages.urllib3.disable_warnings()
self._session.verify = False
@property
def session(self):
return self._session
@staticmethod
def is_valid_entity_id(entity_id):
return invalid_entity_id_re.search(entity_id) is None
[docs] @staticmethod
def validate_check_command(src):
"""
Validates if ``check command`` is valid syntax. Raises exception in case of invalid syntax.
:param src: Check command python source code.
:type src: str
:raises: ZmonError
"""
try:
ast.parse(src)
except Exception as e:
raise ZmonError('Invalid check command: {}'.format(e))
def _join_path(self, parts):
return '/'.join(str(p).strip('/') for p in parts)
def endpoint(self, *args, trailing_slash=True, base_url=None):
parts = list(args)
# Ensure trailing slash!
if trailing_slash:
parts.append('')
url = self.url if not base_url else base_url
return urljoin(url, self._join_path(parts))
def json(self, resp):
resp.raise_for_status()
return resp.json()
########################################################################################################################
# DEEPLINKS
########################################################################################################################
[docs] def check_definition_url(self, check_definition: dict) -> str:
"""
Return direct deeplink to check definition view on ZMON UI.
:param check_definition: check_difinition dict.
:type check_definition: dict
:return: Deeplink to check definition view.
:rtype: str
"""
return self.endpoint(CHECK_DEF_VIEW_URL, check_definition['id'], base_url=self.base_url)
[docs] def alert_details_url(self, alert: dict) -> str:
"""
Return direct deeplink to alert details view on ZMON UI.
:param alert: alert dict.
:type alert: dict
:return: Deeplink to alert details view.
:rtype: str
"""
return self.endpoint(ALERT_DETAILS_VIEW_URL, alert['id'], base_url=self.base_url)
[docs] def dashboard_url(self, dashboard_id: int) -> str:
"""
Return direct deeplink to ZMON dashboard.
:param dashboard_id: ZMON Dashboard ID.
:type dashboard_id: int
:return: Deeplink to dashboard.
:rtype: str
"""
return self.endpoint(DASHBOARD_VIEW_URL, dashboard_id, base_url=self.base_url)
[docs] def token_login_url(self, token: str) -> str:
"""
Return direct deeplink to ZMON one-time login.
:param token: One-time token.
:type token: str
:return: Deeplink to ZMON one-time login.
:rtype: str
"""
return self.endpoint(TOKEN_LOGIN_URL, token, base_url=self.base_url)
[docs] def grafana_dashboard_url(self, dashboard: dict) -> str:
"""
Return direct deeplink to Grafana dashboard.
:param dashboard: Grafana dashboard dict.
:type dashboard: dict
:return: Deeplink to Grafana dashboard.
:rtype: str
"""
if dashboard.get('id', None):
return self.endpoint(GRAFANA_DASHBOARD_URL, dashboard['id'], base_url=self.base_url, trailing_slash=False)
return ""
[docs] @logged
def status(self) -> dict:
"""
Return ZMON status from status API.
:return: ZMON status.
:rtype: dict
"""
resp = self.session.get(self.endpoint(STATUS), timeout=self._timeout)
return self.json(resp)
########################################################################################################################
# ENTITIES
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def get_entities(self, query=None, **kwargs) -> list:
"""
Get ZMON entities, with optional filtering.
:param query: Entity filtering query. Default is ``None``. Example query ``{'type': 'instance'}`` to return
all entities of type: ``instance``.
:type query: dict
:return: List of entities.
:rtype: list
"""
query_str = json.dumps(query) if query else ''
logger.debug('Retrieving entities with query: {} ...'.format(query_str))
current_span = extract_span_from_kwargs(**kwargs)
current_span.log_kv({'query': query_str})
params = {'query': query_str} if query else None
resp = self.session.get(self.endpoint(ENTITIES), params=params, timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def get_entity(self, entity_id: str, **kwargs) -> str:
"""
Retrieve single entity.
:param entity_id: Entity ID.
:type entity_id: str
:return: Entity dict.
:rtype: dict
"""
logger.debug('Retrieving entities with id: {} ...'.format(entity_id))
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('entity_id', entity_id)
resp = self.session.get(self.endpoint(ENTITIES, entity_id, trailing_slash=False), timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def add_entity(self, entity: dict, **kwargs) -> requests.Response:
"""
Create or update an entity on ZMON.
.. note::
ZMON PUT entity API doesn't return JSON response.
:param entity: Entity dict.
:type entity: dict
:return: Response object.
:rtype: :class:`requests.Response`
"""
if 'id' not in entity or 'type' not in entity:
raise ZmonArgumentError('Entity "id" and "type" are required.')
if not self.is_valid_entity_id(entity['id']):
raise ZmonArgumentError('Invalid entity ID.')
logger.debug('Adding new entity: {} ...'.format(entity['id']))
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('entity_id', entity['id'])
data = json.dumps(entity, cls=JSONDateEncoder)
resp = self.session.put(self.endpoint(ENTITIES, trailing_slash=False), data=data, timeout=self._timeout)
resp.raise_for_status()
return resp
[docs] @trace(pass_span=True)
@logged
def delete_entity(self, entity_id: str, **kwargs) -> bool:
"""
Delete entity from ZMON.
.. note::
ZMON DELETE entity API doesn't return JSON response.
:param entity_id: Entity ID.
:type entity_id: str
:return: True if succeeded, False otherwise.
:rtype: bool
"""
logger.debug('Removing existing entity: {} ...'.format(entity_id))
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('entity_id', entity_id)
resp = self.session.delete(self.endpoint(ENTITIES, entity_id))
resp.raise_for_status()
return resp.text == '1'
########################################################################################################################
# DASHBOARD
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def get_dashboard(self, dashboard_id: str, **kwargs) -> dict:
"""
Retrieve a ZMON dashboard.
:param dashboard_id: ZMON dashboard ID.
:type dashboard_id: int, str
:return: Dashboard dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('dashboard_id', dashboard_id)
resp = self.session.get(self.endpoint(DASHBOARD, dashboard_id), timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def update_dashboard(self, dashboard: dict, **kwargs) -> dict:
"""
Create or update dashboard.
If dashboard has an ``id`` then dashboard will be updated, otherwise a new dashboard is created.
:param dashboard: ZMON dashboard dict.
:type dashboard: int, str
:return: Dashboard dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
if 'id' in dashboard and dashboard['id']:
logger.debug('Updating dashboard with ID: {} ...'.format(dashboard['id']))
current_span.set_tag('dashboard_id', dashboard['id'])
resp = self.session.post(self.endpoint(DASHBOARD, dashboard['id']), json=dashboard, timeout=self._timeout)
else:
# new dashboard
logger.debug('Adding new dashboard ...')
resp = self.session.post(self.endpoint(DASHBOARD), json=dashboard, timeout=self._timeout)
resp.raise_for_status()
return self.json(resp)
########################################################################################################################
# CHECK-DEFS
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def get_check_definition(self, definition_id: int, **kwargs) -> dict:
"""
Retrieve check defintion.
:param defintion_id: Check defintion id.
:type defintion_id: int
:return: Check definition dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('check_id', definition_id)
resp = self.session.get(self.endpoint(CHECK_DEF, definition_id), timeout=self._timeout)
# TODO: total hack! API returns 200 if check def does not exist!
if resp.text == '':
resp.status_code = 404
resp.reason = 'Not Found'
return self.json(resp)
[docs] @trace()
@logged
def get_check_definitions(self) -> list:
"""
Return list of all ``active`` check definitions.
:return: List of check-defs.
:rtype: list
"""
resp = self.session.get(self.endpoint(ACTIVE_CHECK_DEF), timeout=self._timeout)
return self.json(resp).get('check_definitions')
[docs] @trace(pass_span=True)
@logged
def update_check_definition(self, check_definition, skip_validation=False, **kwargs) -> dict:
"""
Update existing check definition.
Atrribute ``owning_team`` is required. If ``status`` is not set, then it will be set to ``ACTIVE``.
:param check_definition: ZMON check definition dict.
:type check_definition: dict
:param skip_validation: Skip validation of the check command syntax.
:type skip_validation: bool
:return: Check definition dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
if 'owning_team' not in check_definition:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Check definition must have "owning_team"'})
raise ZmonArgumentError('Check definition must have "owning_team"')
if 'status' not in check_definition:
check_definition['status'] = 'ACTIVE'
if not skip_validation:
try:
self.validate_check_command(check_definition['command'])
except Exception:
current_span.set_tag('error', True)
current_span.log_kv({'exception': traceback.format_exc()})
raise
resp = self.session.post(self.endpoint(CHECK_DEF), json=check_definition, timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def delete_check_definition(self, check_definition_id: int, **kwargs) -> requests.Response:
"""
Delete existing check definition.
:param check_definition_id: ZMON check definition ID.
:type check_definition_id: int
:return: HTTP response.
:rtype: :class:`requests.Response`
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('check_id', str(check_definition_id))
resp = self.session.delete(self.endpoint(CHECK_DEF, check_definition_id))
resp.raise_for_status()
return resp
########################################################################################################################
# ALERT-DEFS & DATA
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def get_alert_definition(self, alert_id: int, **kwargs) -> dict:
"""
Retrieve alert definition.
:param alert_id: Alert definition ID.
:type alert_id: int
:return: Alert definition dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('alert_id', str(alert_id))
resp = self.session.get(self.endpoint(ALERT_DEF, alert_id), timeout=self._timeout)
return self.json(resp)
[docs] @trace()
@logged
def get_alert_definitions(self) -> list:
"""
Return list of all ``active`` alert definitions.
:return: List of alert-defs.
:rtype: list
"""
resp = self.session.get(self.endpoint(ACTIVE_ALERT_DEF), timeout=self._timeout)
return self.json(resp).get('alert_definitions')
[docs] @trace(pass_span=True)
@logged
def create_alert_definition(self, alert_definition: dict, **kwargs) -> dict:
"""
Create new alert definition.
Attributes ``last_modified_by`` and ``check_definition_id`` are required.
If ``status`` is not set, then it will be set to ``ACTIVE``.
:param alert_definition: ZMON alert definition dict.
:type alert_definition: dict
:return: Alert definition dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
if 'last_modified_by' not in alert_definition:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Alert definition must have "last_modified_by"'})
raise ZmonArgumentError('Alert definition must have "last_modified_by"')
if 'status' not in alert_definition:
alert_definition['status'] = 'ACTIVE'
if 'check_definition_id' not in alert_definition:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Alert definition must have "last_modified_by"'})
raise ZmonArgumentError('Alert defintion must have "check_definition_id"')
current_span.set_tag('check_id', alert_definition['check_definition_id'])
resp = self.session.post(self.endpoint(ALERT_DEF), json=alert_definition, timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def update_alert_definition(self, alert_definition: dict, **kwargs) -> dict:
"""
Update existing alert definition.
Atrributes ``id``, ``last_modified_by`` and ``check_definition_id`` are required.
If ``status`` is not set, then it will be set to ``ACTIVE``.
:param alert_definition: ZMON alert definition dict.
:type alert_definition: dict
:return: Alert definition dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
if 'last_modified_by' not in alert_definition:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Alert definition must have "last_modified_by"'})
raise ZmonArgumentError('Alert definition must have "last_modified_by"')
if 'id' not in alert_definition:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Alert definition must have "id"'})
raise ZmonArgumentError('Alert definition must have "id"')
current_span.set_tag('alert_id', alert_definition['id'])
if 'check_definition_id' not in alert_definition:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Alert definition must have "check_definition_id"'})
raise ZmonArgumentError('Alert defintion must have "check_definition_id"')
current_span.set_tag('check_id', alert_definition['check_definition_id'])
if 'status' not in alert_definition:
alert_definition['status'] = 'ACTIVE'
resp = self.session.put(
self.endpoint(ALERT_DEF, alert_definition['id']), json=alert_definition, timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def delete_alert_definition(self, alert_definition_id: int, **kwargs) -> dict:
"""
Delete existing alert definition.
:param alert_definition_id: ZMON alert definition ID.
:type alert_definition_id: int
:return: Alert definition dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('alert_id', str(alert_definition_id))
resp = self.session.delete(self.endpoint(ALERT_DEF, alert_definition_id))
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def get_alert_data(self, alert_id: int, **kwargs) -> dict:
"""
Retrieve alert data.
Response is a ``dict`` with entity ID as a key, and check return value as a value.
:param alert_id: ZMON alert ID.
:type alert_id: int
:return: Alert data dict.
:rtype: dict
Example:
.. code-block:: json
{
"entity-id-1": 122,
"entity-id-2": 0,
"entity-id-3": 100
}
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('alert_id', str(alert_id))
resp = self.session.get(self.endpoint(ALERT_DATA, alert_id, 'all-entities'), timeout=self._timeout)
return self.json(resp)
########################################################################################################################
# SEARCH
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def search(self, q, limit=None, teams=None, **kwargs) -> dict:
"""
Search ZMON dashboards, checks, alerts and grafana dashboards with optional team filtering.
:param q: search query.
:type q: str
:param teams: List of team IDs. Default is None.
:type teams: list
:return: Search result.
:rtype: dict
Example:
.. code-block:: json
{
"alerts": [{"id": "123", "title": "ZMON alert", "team": "ZMON"}],
"checks": [{"id": "123", "title": "ZMON check", "team": "ZMON"}],
"dashboards": [{"id": "123", "title": "ZMON dashboard", "team": "ZMON"}],
"grafana_dashboards": [{"id": "123", "title": "ZMON grafana", "team": ""}],
}
"""
current_span = extract_span_from_kwargs(**kwargs)
if teams and type(teams) not in (list, tuple):
current_span.set_tag('error', True)
current_span.log_kv({'exception': '"teams" should be a list'})
raise ZmonArgumentError('"teams" should be a list!')
params = {'query': q}
if limit:
params.update({'limit': limit})
if teams:
params['teams'] = ','.join(teams)
current_span.log_kv({'query', json.dumps(params)})
resp = self.session.get(self.endpoint(SEARCH), params=params, timeout=self._timeout)
return self.json(resp)
########################################################################################################################
# ONETIME-TOKENS
########################################################################################################################
[docs] @trace()
@logged
def list_onetime_tokens(self) -> list:
"""
List exisitng one-time tokens.
:return: List of one-time tokens, with relevant attributes.
:retype: list
Example:
.. code-block:: yaml
- bound_at: 2016-09-08 14:00:12.645999
bound_expires: 1503744673506
bound_ip: 192.168.20.16
created: 2016-08-26 12:51:13.506000
token: 9pSzKpcO
"""
resp = self.session.get(self.endpoint(TOKENS), timeout=self._timeout)
return self.json(resp)
[docs] @trace()
@logged
def get_onetime_token(self) -> str:
"""
Retrieve new one-time token.
You can use :func:`zmon_cli.client.Zmon.token_login_url` to return a deeplink to one-time login.
:return: One-time token.
:retype: str
"""
resp = self.session.post(self.endpoint(TOKENS), json={}, timeout=self._timeout)
resp.raise_for_status()
return resp.text
########################################################################################################################
# GRAFANA
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def get_grafana_dashboard(self, grafana_dashboard_uid: str, **kwargs) -> dict:
"""
Retrieve Grafana dashboard.
:param grafana_dashboard_uid: Grafana dashboard UID.
:type grafana_dashboard_uid: str
:return: Grafana dashboard dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
current_span.set_tag('grafana_dashboard_uid', grafana_dashboard_uid)
url = self.endpoint(GRAFANA, grafana_dashboard_uid, trailing_slash=False)
resp = self.session.get(url, timeout=self._timeout)
return self.json(resp)
[docs] @trace(pass_span=True)
@logged
def update_grafana_dashboard(self, grafana_dashboard: dict, **kwargs) -> dict:
"""
Update existing Grafana dashboard.
Atrributes ``uid`` and ``title`` are required.
:param grafana_dashboard: Grafana dashboard dict.
:type grafana_dashboard: dict
:return: Grafana dashboard dict.
:rtype: dict
"""
current_span = extract_span_from_kwargs(**kwargs)
if 'uid' not in grafana_dashboard['dashboard']:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Grafana dashboard must have "uid". Use Grafana6 dashboard format.'})
raise ZmonArgumentError('Grafana dashboard must have "uid". Hint: Use Grafana6 dashboard format.')
elif 'title' not in grafana_dashboard['dashboard']:
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Grafana dashboard must have "title"'})
raise ZmonArgumentError('Grafana dashboard must have "title"')
current_span.set_tag('grafana_dashboard_uid', grafana_dashboard['dashboard']['uid'])
if 'id' in grafana_dashboard['dashboard'] and grafana_dashboard['dashboard']['id'] is not None:
current_span.set_tag('grafana_dashboard_id', grafana_dashboard['dashboard']['id'])
resp = self.session.post(self.endpoint(GRAFANA), json=json.dumps(grafana_dashboard), timeout=self._timeout)
return self.json(resp)
########################################################################################################################
# DOWNTIMES
########################################################################################################################
[docs] @trace(pass_span=True)
@logged
def create_downtime(self, downtime: dict, **kwargs) -> dict:
"""
Create a downtime for specific entities.
Atrributes ``entities`` list, ``start_time`` and ``end_time`` timestamps are required.
:param downtime: Downtime dict.
:type downtime: dict
:return: Downtime dict.
:rtype: dict
Example downtime:
.. code-block:: json
{
"entities": ["entity-id-1", "entity-id-2"],
"comment": "Planned maintenance",
"start_time": 1473337437.312921,
"end_time": 1473341037.312921,
}
"""
current_span = extract_span_from_kwargs(**kwargs)
if not downtime.get('entities'):
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'At least one entity ID should be specified'})
raise ZmonArgumentError('At least one entity ID should be specified')
if not downtime.get('start_time') or not downtime.get('end_time'):
current_span.set_tag('error', True)
current_span.log_kv({'exception': 'Downtime must specify "start_time" and "end_time"'})
raise ZmonArgumentError('Downtime must specify "start_time" and "end_time"')
current_span.set_tag('entity_ids', str(downtime.get('entities')))
# FIXME - those also?
# current_span.set_tag('start_time', str(downtime.get('start_time')))
# current_span.set_tag('end_time', str(downtime.get('end_time')))
resp = self.session.post(self.endpoint(DOWNTIME), json=downtime, timeout=self._timeout)
return self.json(resp)
########################################################################################################################
# GROUPS - MEMBERS - ???
########################################################################################################################
@logged
def get_groups(self):
resp = self.session.get(self.endpoint(GROUPS), timeout=self._timeout)
return self.json(resp)
@logged
def switch_active_user(self, group_name, user_name):
resp = self.session.delete(self.endpoint(GROUPS, group_name, 'active'))
if not resp.ok:
logger.error('Failed to de-activate group: {}'.format(group_name))
resp.raise_for_status()
logger.debug('Switching active user: {}'.format(user_name))
resp = self.session.put(self.endpoint(GROUPS, group_name, 'active', user_name), timeout=self._timeout)
if not resp.ok:
logger.error('Failed to switch active user {}'.format(user_name))
resp.raise_for_status()
return resp.text == '1'
@logged
def add_member(self, group_name, user_name):
resp = self.session.put(self.endpoint(GROUPS, group_name, MEMBER, user_name), timeout=self._timeout)
resp.raise_for_status()
return resp.text == '1'
@logged
def remove_member(self, group_name, user_name):
resp = self.session.delete(self.endpoint(GROUPS, group_name, MEMBER, user_name))
resp.raise_for_status()
return resp.text == '1'
@logged
def add_phone(self, member_email, phone_nr):
resp = self.session.put(self.endpoint(GROUPS, member_email, PHONE, phone_nr), timeout=self._timeout)
resp.raise_for_status()
return resp.text == '1'
@logged
def remove_phone(self, member_email, phone_nr):
resp = self.session.delete(self.endpoint(GROUPS, member_email, PHONE, phone_nr))
resp.raise_for_status()
return resp.text == '1'
@logged
def set_name(self, member_email, member_name):
resp = self.session.put(self.endpoint(GROUPS, member_email, PHONE, member_name), timeout=self._timeout)
resp.raise_for_status()
return resp