# -*- coding: utf-8 -*-
# Copyright 2016 Dana James Traversie and Check Point Software Technologies, Ltd. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# cpauto.core.sessions
# ~~~~~~~~~~~~~~~~~~~~
"""This module contains the primary objects needed to manage R80 Web API sessions."""
from .exceptions import (
CoreClientError,
WaitOnTaskError,
ConnectionError,
HTTPError,
SSLError,
Timeout,
TooManyRedirects,
InvalidURL
)
from ..objects._common import _CommonClient
import time
import requests
[docs]class CoreClientResult:
"""Stores the status code and JSON body
received in an HTTP response to an API request.
"""
def __init__(self, status_code, json):
self.status_code = status_code
self.success = status_code == 200
self.message = ""
self.__json = json
[docs] def set_success(self, value=True):
self.success = value
[docs] def set_message(self, value=""):
self.message = value
[docs] def json(self):
return dict(self.__json)
[docs]class CoreClient:
"""The cpauto core client.
Provides basic configuration and persistence.
Basic Usage::
>>> import cpauto
>>> cc = cpauto.CoreClient('admin', 'vpn123', '10.11.12.13')
>>> r = cc.login()
>>> r.status_code
200
"""
def __init__(self, user='', password='', mgmt_server='', port=443, verify=True, wait_for_tasks=True):
self.__last_login_result = None
self.__user = user
self.__password = password
self.__mgmt_server = mgmt_server
self.__port = port
self.__verify = verify
self.__wait_for_tasks = wait_for_tasks
def __build_uri(self, endpoint):
uri = 'https://' + self.__mgmt_server + ':' + str(self.__port) + '/web_api/' + endpoint
return uri
def __build_headers(self, send_sid=True):
headers = { 'content-type': 'application/json', 'user-agent': 'cpauto-CoreClient/0.0.5' }
if send_sid and self.__last_login_result is not None:
last_login_json = self.__last_login_result.json()
headers['x-chkp-sid'] = last_login_json['sid']
return headers
def __wait_on_task(self, task_id):
task_complete = False
task_r = None
while not task_complete:
task_r = self.http_post(endpoint="show-task", payload={"task-id": task_id, "details-level": "full"})
while task_r.status_code == 404:
time.sleep(1)
task_r = self.http_post(endpoint="show-task", payload={"task-id": task_id, "details-level": "full"})
if task_r.status_code != 200:
raise WaitOnTaskError("Failed to handle asynchronous task as synchronous")
data = task_r.json()
completed_tasks = sum(1 for task in data["tasks"] if task["status"] != "in progress")
total_tasks = len(data["tasks"])
if completed_tasks == total_tasks:
task_complete = True
else:
time.sleep(2)
self.__check_task_result(task_r)
return task_r
def __wait_on_tasks(self, task_objects):
tasks = []
for task_object in task_objects:
task_id = task_object["task-id"]
tasks.append(task_id)
self.__wait_on_task(task_id)
task_r = self.http_post(endpoint="show-task", payload={"task-id": tasks, "details-level": "full"})
self.__check_task_result(task_r)
return task_r
def __check_task_result(self, task_result):
data = task_result.json()
for task in data["tasks"]:
if task["status"] == "failed" or task["status"] == "partially succeeded":
task_result.set_success(False)
task_result.set_message("There was at least one task that failed or partially succeeded")
break
[docs] def http_post(self, endpoint, send_sid=True, payload={}):
"""Makes an HTTP post to the specified API endpoint using user supplied data.
:param endpoint: The API endpoint (e.g. /login).
:param send_sid: Send the session ID as a header when true.
:param payload: The payload (dictionary) that will be included
as JSON in the body of the request.
:rtype: CoreClientResult
"""
uri = self.__build_uri(endpoint)
headers = self.__build_headers(send_sid)
try:
r = requests.post(uri, headers=headers, json=payload, verify=self.__verify)
# wait for tasks if needed
if self.__wait_for_tasks and r.status_code == 200 and endpoint != "show-task":
data = r.json()
if "task-id" in data:
return self.__wait_on_task(data["task-id"])
elif "tasks" in data:
return self.__wait_on_tasks(data["tasks"])
except requests.exceptions.SSLError as e:
raise SSLError('SSL error: ' + str(e))
except requests.exceptions.ConnectionError as e:
raise ConnectionError('Connection error: ' + str(e))
except requests.exceptions.HTTPError as e:
raise HTTPError('HTTP error: ' + str(e))
except requests.exceptions.Timeout as e:
raise Timeout(str(e))
except requests.exceptions.TooManyRedirects as e:
raise TooManyRedirects(str(e))
except requests.exceptions.InvalidURL as e:
raise InvalidURL(str(e))
return CoreClientResult(r.status_code, r.json())
[docs] def merge_payloads(self, payload_a, payload_b):
"""Merges the contents of two payloads (dictionaries).
:param payload_a: A payload to merge
:param payload_b: Another payload to merge
:returns: A single payload (dictionary) with the contents of the two original payloads
"""
payload_c = payload_a.copy()
payload_c.update(payload_b)
return payload_c
[docs] def login(self, params={}):
"""Login to the R80 Web API server and store the results
of the request as a class attribute.
https://sc1.checkpoint.com/documents/R80/APIs/#web/login
:param params: (optional) A dictionary of additional, supported parameter names and values.
:rtype: CoreClientResult
"""
payload = { 'user': self.__user,
'password': self.__password }
if params:
payload = self.merge_payloads(payload, params)
r = self.http_post('login', send_sid=False, payload=payload)
self.__last_login_result = r
return r
[docs] def logout(self):
"""Logout of the R80 Web API server and invalidate the session.
https://sc1.checkpoint.com/documents/R80/APIs/#web/logout
:rtype: CoreClientResult
"""
return self.http_post('logout')
[docs] def publish(self, uid=""):
"""Makes all changes made visible to other users.
https://sc1.checkpoint.com/documents/R80/APIs/#web/publish
:param uid: (optional) Specify a different session unique
identifier to publish.
:rtype: CoreClientResult
"""
payload = {}
if uid:
payload['uid'] = uid
return self.http_post('publish', payload=payload)
[docs] def discard(self, uid=""):
"""Discards all changes made and removes them from the database.
https://sc1.checkpoint.com/documents/R80/APIs/#web/discard
:param uid: (optional) Specify a different sessions unique
identifier to discard.
:rtype: CoreClientResult
"""
payload = {}
if uid:
payload['uid'] = uid
return self.http_post('discard', payload=payload)
[docs] def keepalive(self):
"""Keeps the session alive and valid.
https://sc1.checkpoint.com/documents/R80/APIs/#web/keepalive
:rtype: CoreClientResult
"""
return self.http_post('keepalive')
[docs]class Session:
"""Manage sessions."""
def __init__(self, core_client):
self.__core_client = core_client
self.__common_client = _CommonClient(core_client)
[docs] def switch(self, uid=""):
"""Switch sessions.
https://sc1.checkpoint.com/documents/R80/APIs/#web/switch-session
:param uid: A session unique identifier.
:rtype: CoreClientResult
"""
payload = { 'uid': uid }
return self.__core_client.http_post('switch-session', payload=payload)
[docs] def show(self, uid=""):
"""Shows details of current session or a session with
the specified uid.
https://sc1.checkpoint.com/documents/R80/APIs/#web/show-session
:param uid: (optional) The unique identifier of an existing session.
:rtype: CoreClientResult
"""
return self.__common_client._show('show-session', name="", uid=uid, details_level="")
[docs] def set(self, params={}):
"""Sets new values for certain parameters of the current session.
https://sc1.checkpoint.com/documents/R80/APIs/#web/set-session
:param params: (optional) A dictionary of additional, supported parameter names and values.
:rtype: CoreClientResult
"""
return self.__common_client._set('set-session', name="", uid="", params=params)
[docs] def continue_in_sc(self, uid=""):
"""Logout from existing session. The session will be continued next time your open
SmartConsole. In case 'uid' is not provided, use current session. In order for the
session to pass successfully to SmartConsole, make sure you don't have any other
active GUI sessions.
https://sc1.checkpoint.com/documents/R80/APIs/#web/continue-session-in-smartconsole
:param uid: (optional) The unique identifier of an existing session.
:rtype: CoreClientResult
"""
payload = {}
if uid:
payload['uid'] = uid
return self.__core_client.http_post('continue-session-in-smartconsole', payload=payload)
[docs] def show_all(self, limit=50, offset=0, order=[], details_level=''):
"""Shows all sessions with some reasonable limitations.
https://sc1.checkpoint.com/documents/R80/APIs/#web/show-sessions
:param limit: (optional) Limit the total number of sessions shown.
The default value is 50 and allowed values are in the range 1 to 500.
:param offset: (optional) Skip a number of sessions in the results
before they are shown. Default value is 0.
:param order: (optional) Sort the results by the specified field. The
default is a random order.
:param details_level: (optional) The level of detail to show. Default
value is 'standard' and the other options are: 'uid' or 'full'
:rtype: CoreClientResult
"""
return self.__common_client._show_all('show-sessions', limit=limit,
offset=offset, order=order, details_level=details_level)
[docs]class LoginMessage:
"""Manage login message."""
def __init__(self, core_client):
self.__core_client = core_client
self.__common_client = _CommonClient(core_client)
[docs] def show(self):
"""Shows details of current login message.
https://sc1.checkpoint.com/documents/R80/APIs/#web/show-login-message
:rtype: CoreClientResult
"""
return self.__common_client._show('show-login-message', name="", uid="", details_level="")
[docs] def set(self, params={}):
"""Sets new values for current login message.
https://sc1.checkpoint.com/documents/R80/APIs/#web/set-login-message
:param params: (optional) A dictionary of additional, supported parameter names and values.
:rtype: CoreClientResult
"""
return self.__common_client._set('set-login-message', name="", uid="", params=params)