#------------------------------------------------------------------------------ # # Copyright (c) Microsoft Corporation. # All rights reserved. # # This code is licensed under the MIT License. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files(the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and / or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions : # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # #------------------------------------------------------------------------------ import uuid from datetime import datetime, timedelta import requests from . import log from . import util from . import wstrust_response from .adal_error import AdalError from .constants import WSTrustVersion _USERNAME_PLACEHOLDER = '{UsernamePlaceHolder}' _PASSWORD_PLACEHOLDER = '{PasswordPlaceHolder}' class WSTrustRequest(object): def __init__(self, call_context, watrust_endpoint_url, applies_to, wstrust_endpoint_version): self._log = log.Logger('WSTrustRequest', call_context['log_context']) self._call_context = call_context self._wstrust_endpoint_url = watrust_endpoint_url self._applies_to = applies_to self._wstrust_endpoint_version = wstrust_endpoint_version @staticmethod def _build_security_header(): time_now = datetime.utcnow() expire_time = time_now + timedelta(minutes=10) time_now_str = time_now.isoformat()[:-3] + 'Z' expire_time_str = expire_time.isoformat()[:-3] + 'Z' security_header_xml = ("" "" "" + time_now_str + "" "" + expire_time_str + "" "" "" "" + _USERNAME_PLACEHOLDER + "" "" + _PASSWORD_PLACEHOLDER + "" "" "") return security_header_xml @staticmethod def _populate_rst_username_password(template, username, password): password = WSTrustRequest._escape_password(password) return template.replace(_USERNAME_PLACEHOLDER, username).replace(_PASSWORD_PLACEHOLDER, password) @staticmethod def _escape_password(password): return password.replace('&', '&').replace('"', '"').replace("'", ''').replace('<', '<').replace('>', '>') def _build_rst(self, username, password): message_id = str(uuid.uuid4()) schema_location = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd' soap_action = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue' rst_trust_namespace = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512' key_type = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/Bearer' request_type = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/Issue' if self._wstrust_endpoint_version == WSTrustVersion.WSTRUST2005: soap_action = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue' rst_trust_namespace = 'http://schemas.xmlsoap.org/ws/2005/02/trust' key_type = 'http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey' request_type = 'http://schemas.xmlsoap.org/ws/2005/02/trust/Issue' rst_template = ("".format(schema_location) + "" + "{}".format(soap_action) + "urn:uuid:{}".format(message_id) + "" + "http://www.w3.org/2005/08/addressing/anonymous" + "" + "{}".format(self._wstrust_endpoint_url) + WSTrustRequest._build_security_header() + "" + "" + "".format(rst_trust_namespace) + "" + "" + "{}".format(self._applies_to) + "" + "" + "{}".format(key_type) + "{}".format(request_type) + "" + "" + "") self._log.debug('Created RST: \n %(rst_template)s', {"rst_template": rst_template}) return WSTrustRequest._populate_rst_username_password(rst_template, username, password) def _handle_rstr(self, body): wstrust_resp = wstrust_response.WSTrustResponse(self._call_context, body, self._wstrust_endpoint_version) wstrust_resp.parse() return wstrust_resp def acquire_token(self, username, password): if self._wstrust_endpoint_version == WSTrustVersion.UNDEFINED: raise AdalError('Unsupported wstrust endpoint version. Current support version is wstrust2005 or wstrust13.') rst = self._build_rst(username, password) if self._wstrust_endpoint_version == WSTrustVersion.WSTRUST2005: soap_action = 'http://schemas.xmlsoap.org/ws/2005/02/trust/RST/Issue' else: soap_action = 'http://docs.oasis-open.org/ws-sx/ws-trust/200512/RST/Issue' headers = {'headers': {'Content-type':'application/soap+xml; charset=utf-8', 'SOAPAction': soap_action}, 'body': rst} options = util.create_request_options(self, headers) self._log.debug("Sending RST to: %(wstrust_endpoint)s", {"wstrust_endpoint": self._wstrust_endpoint_url}) operation = "WS-Trust RST" resp = requests.post(self._wstrust_endpoint_url, headers=options['headers'], data=rst, allow_redirects=True, verify=self._call_context.get('verify_ssl', None), proxies=self._call_context.get('proxies', None), timeout=self._call_context.get('timeout', None)) util.log_return_correlation_id(self._log, operation, resp) if resp.status_code == 429: resp.raise_for_status() # Will raise requests.exceptions.HTTPError if not util.is_http_success(resp.status_code): return_error_string = u"{} request returned http error: {}".format(operation, resp.status_code) error_response = "" if resp.text: return_error_string = u"{} and server response: {}".format(return_error_string, resp.text) try: error_response = resp.json() except ValueError: pass raise AdalError(return_error_string, error_response) else: return self._handle_rstr(resp.text)