| """ |
| Low-level helpers for the SecureTransport bindings. |
| |
| These are Python functions that are not directly related to the high-level APIs |
| but are necessary to get them to work. They include a whole bunch of low-level |
| CoreFoundation messing about and memory management. The concerns in this module |
| are almost entirely about trying to avoid memory leaks and providing |
| appropriate and useful assistance to the higher-level code. |
| """ |
| import base64 |
| import ctypes |
| import itertools |
| import os |
| import re |
| import ssl |
| import struct |
| import tempfile |
| |
| from .bindings import CFConst, CoreFoundation, Security |
| |
| # This regular expression is used to grab PEM data out of a PEM bundle. |
| _PEM_CERTS_RE = re.compile( |
| b"-----BEGIN CERTIFICATE-----\n(.*?)\n-----END CERTIFICATE-----", re.DOTALL |
| ) |
| |
| |
| def _cf_data_from_bytes(bytestring): |
| """ |
| Given a bytestring, create a CFData object from it. This CFData object must |
| be CFReleased by the caller. |
| """ |
| return CoreFoundation.CFDataCreate( |
| CoreFoundation.kCFAllocatorDefault, bytestring, len(bytestring) |
| ) |
| |
| |
| def _cf_dictionary_from_tuples(tuples): |
| """ |
| Given a list of Python tuples, create an associated CFDictionary. |
| """ |
| dictionary_size = len(tuples) |
| |
| # We need to get the dictionary keys and values out in the same order. |
| keys = (t[0] for t in tuples) |
| values = (t[1] for t in tuples) |
| cf_keys = (CoreFoundation.CFTypeRef * dictionary_size)(*keys) |
| cf_values = (CoreFoundation.CFTypeRef * dictionary_size)(*values) |
| |
| return CoreFoundation.CFDictionaryCreate( |
| CoreFoundation.kCFAllocatorDefault, |
| cf_keys, |
| cf_values, |
| dictionary_size, |
| CoreFoundation.kCFTypeDictionaryKeyCallBacks, |
| CoreFoundation.kCFTypeDictionaryValueCallBacks, |
| ) |
| |
| |
| def _cfstr(py_bstr): |
| """ |
| Given a Python binary data, create a CFString. |
| The string must be CFReleased by the caller. |
| """ |
| c_str = ctypes.c_char_p(py_bstr) |
| cf_str = CoreFoundation.CFStringCreateWithCString( |
| CoreFoundation.kCFAllocatorDefault, |
| c_str, |
| CFConst.kCFStringEncodingUTF8, |
| ) |
| return cf_str |
| |
| |
| def _create_cfstring_array(lst): |
| """ |
| Given a list of Python binary data, create an associated CFMutableArray. |
| The array must be CFReleased by the caller. |
| |
| Raises an ssl.SSLError on failure. |
| """ |
| cf_arr = None |
| try: |
| cf_arr = CoreFoundation.CFArrayCreateMutable( |
| CoreFoundation.kCFAllocatorDefault, |
| 0, |
| ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), |
| ) |
| if not cf_arr: |
| raise MemoryError("Unable to allocate memory!") |
| for item in lst: |
| cf_str = _cfstr(item) |
| if not cf_str: |
| raise MemoryError("Unable to allocate memory!") |
| try: |
| CoreFoundation.CFArrayAppendValue(cf_arr, cf_str) |
| finally: |
| CoreFoundation.CFRelease(cf_str) |
| except BaseException as e: |
| if cf_arr: |
| CoreFoundation.CFRelease(cf_arr) |
| raise ssl.SSLError("Unable to allocate array: %s" % (e,)) |
| return cf_arr |
| |
| |
| def _cf_string_to_unicode(value): |
| """ |
| Creates a Unicode string from a CFString object. Used entirely for error |
| reporting. |
| |
| Yes, it annoys me quite a lot that this function is this complex. |
| """ |
| value_as_void_p = ctypes.cast(value, ctypes.POINTER(ctypes.c_void_p)) |
| |
| string = CoreFoundation.CFStringGetCStringPtr( |
| value_as_void_p, CFConst.kCFStringEncodingUTF8 |
| ) |
| if string is None: |
| buffer = ctypes.create_string_buffer(1024) |
| result = CoreFoundation.CFStringGetCString( |
| value_as_void_p, buffer, 1024, CFConst.kCFStringEncodingUTF8 |
| ) |
| if not result: |
| raise OSError("Error copying C string from CFStringRef") |
| string = buffer.value |
| if string is not None: |
| string = string.decode("utf-8") |
| return string |
| |
| |
| def _assert_no_error(error, exception_class=None): |
| """ |
| Checks the return code and throws an exception if there is an error to |
| report |
| """ |
| if error == 0: |
| return |
| |
| cf_error_string = Security.SecCopyErrorMessageString(error, None) |
| output = _cf_string_to_unicode(cf_error_string) |
| CoreFoundation.CFRelease(cf_error_string) |
| |
| if output is None or output == u"": |
| output = u"OSStatus %s" % error |
| |
| if exception_class is None: |
| exception_class = ssl.SSLError |
| |
| raise exception_class(output) |
| |
| |
| def _cert_array_from_pem(pem_bundle): |
| """ |
| Given a bundle of certs in PEM format, turns them into a CFArray of certs |
| that can be used to validate a cert chain. |
| """ |
| # Normalize the PEM bundle's line endings. |
| pem_bundle = pem_bundle.replace(b"\r\n", b"\n") |
| |
| der_certs = [ |
| base64.b64decode(match.group(1)) for match in _PEM_CERTS_RE.finditer(pem_bundle) |
| ] |
| if not der_certs: |
| raise ssl.SSLError("No root certificates specified") |
| |
| cert_array = CoreFoundation.CFArrayCreateMutable( |
| CoreFoundation.kCFAllocatorDefault, |
| 0, |
| ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), |
| ) |
| if not cert_array: |
| raise ssl.SSLError("Unable to allocate memory!") |
| |
| try: |
| for der_bytes in der_certs: |
| certdata = _cf_data_from_bytes(der_bytes) |
| if not certdata: |
| raise ssl.SSLError("Unable to allocate memory!") |
| cert = Security.SecCertificateCreateWithData( |
| CoreFoundation.kCFAllocatorDefault, certdata |
| ) |
| CoreFoundation.CFRelease(certdata) |
| if not cert: |
| raise ssl.SSLError("Unable to build cert object!") |
| |
| CoreFoundation.CFArrayAppendValue(cert_array, cert) |
| CoreFoundation.CFRelease(cert) |
| except Exception: |
| # We need to free the array before the exception bubbles further. |
| # We only want to do that if an error occurs: otherwise, the caller |
| # should free. |
| CoreFoundation.CFRelease(cert_array) |
| raise |
| |
| return cert_array |
| |
| |
| def _is_cert(item): |
| """ |
| Returns True if a given CFTypeRef is a certificate. |
| """ |
| expected = Security.SecCertificateGetTypeID() |
| return CoreFoundation.CFGetTypeID(item) == expected |
| |
| |
| def _is_identity(item): |
| """ |
| Returns True if a given CFTypeRef is an identity. |
| """ |
| expected = Security.SecIdentityGetTypeID() |
| return CoreFoundation.CFGetTypeID(item) == expected |
| |
| |
| def _temporary_keychain(): |
| """ |
| This function creates a temporary Mac keychain that we can use to work with |
| credentials. This keychain uses a one-time password and a temporary file to |
| store the data. We expect to have one keychain per socket. The returned |
| SecKeychainRef must be freed by the caller, including calling |
| SecKeychainDelete. |
| |
| Returns a tuple of the SecKeychainRef and the path to the temporary |
| directory that contains it. |
| """ |
| # Unfortunately, SecKeychainCreate requires a path to a keychain. This |
| # means we cannot use mkstemp to use a generic temporary file. Instead, |
| # we're going to create a temporary directory and a filename to use there. |
| # This filename will be 8 random bytes expanded into base64. We also need |
| # some random bytes to password-protect the keychain we're creating, so we |
| # ask for 40 random bytes. |
| random_bytes = os.urandom(40) |
| filename = base64.b16encode(random_bytes[:8]).decode("utf-8") |
| password = base64.b16encode(random_bytes[8:]) # Must be valid UTF-8 |
| tempdirectory = tempfile.mkdtemp() |
| |
| keychain_path = os.path.join(tempdirectory, filename).encode("utf-8") |
| |
| # We now want to create the keychain itself. |
| keychain = Security.SecKeychainRef() |
| status = Security.SecKeychainCreate( |
| keychain_path, len(password), password, False, None, ctypes.byref(keychain) |
| ) |
| _assert_no_error(status) |
| |
| # Having created the keychain, we want to pass it off to the caller. |
| return keychain, tempdirectory |
| |
| |
| def _load_items_from_file(keychain, path): |
| """ |
| Given a single file, loads all the trust objects from it into arrays and |
| the keychain. |
| Returns a tuple of lists: the first list is a list of identities, the |
| second a list of certs. |
| """ |
| certificates = [] |
| identities = [] |
| result_array = None |
| |
| with open(path, "rb") as f: |
| raw_filedata = f.read() |
| |
| try: |
| filedata = CoreFoundation.CFDataCreate( |
| CoreFoundation.kCFAllocatorDefault, raw_filedata, len(raw_filedata) |
| ) |
| result_array = CoreFoundation.CFArrayRef() |
| result = Security.SecItemImport( |
| filedata, # cert data |
| None, # Filename, leaving it out for now |
| None, # What the type of the file is, we don't care |
| None, # what's in the file, we don't care |
| 0, # import flags |
| None, # key params, can include passphrase in the future |
| keychain, # The keychain to insert into |
| ctypes.byref(result_array), # Results |
| ) |
| _assert_no_error(result) |
| |
| # A CFArray is not very useful to us as an intermediary |
| # representation, so we are going to extract the objects we want |
| # and then free the array. We don't need to keep hold of keys: the |
| # keychain already has them! |
| result_count = CoreFoundation.CFArrayGetCount(result_array) |
| for index in range(result_count): |
| item = CoreFoundation.CFArrayGetValueAtIndex(result_array, index) |
| item = ctypes.cast(item, CoreFoundation.CFTypeRef) |
| |
| if _is_cert(item): |
| CoreFoundation.CFRetain(item) |
| certificates.append(item) |
| elif _is_identity(item): |
| CoreFoundation.CFRetain(item) |
| identities.append(item) |
| finally: |
| if result_array: |
| CoreFoundation.CFRelease(result_array) |
| |
| CoreFoundation.CFRelease(filedata) |
| |
| return (identities, certificates) |
| |
| |
| def _load_client_cert_chain(keychain, *paths): |
| """ |
| Load certificates and maybe keys from a number of files. Has the end goal |
| of returning a CFArray containing one SecIdentityRef, and then zero or more |
| SecCertificateRef objects, suitable for use as a client certificate trust |
| chain. |
| """ |
| # Ok, the strategy. |
| # |
| # This relies on knowing that macOS will not give you a SecIdentityRef |
| # unless you have imported a key into a keychain. This is a somewhat |
| # artificial limitation of macOS (for example, it doesn't necessarily |
| # affect iOS), but there is nothing inside Security.framework that lets you |
| # get a SecIdentityRef without having a key in a keychain. |
| # |
| # So the policy here is we take all the files and iterate them in order. |
| # Each one will use SecItemImport to have one or more objects loaded from |
| # it. We will also point at a keychain that macOS can use to work with the |
| # private key. |
| # |
| # Once we have all the objects, we'll check what we actually have. If we |
| # already have a SecIdentityRef in hand, fab: we'll use that. Otherwise, |
| # we'll take the first certificate (which we assume to be our leaf) and |
| # ask the keychain to give us a SecIdentityRef with that cert's associated |
| # key. |
| # |
| # We'll then return a CFArray containing the trust chain: one |
| # SecIdentityRef and then zero-or-more SecCertificateRef objects. The |
| # responsibility for freeing this CFArray will be with the caller. This |
| # CFArray must remain alive for the entire connection, so in practice it |
| # will be stored with a single SSLSocket, along with the reference to the |
| # keychain. |
| certificates = [] |
| identities = [] |
| |
| # Filter out bad paths. |
| paths = (path for path in paths if path) |
| |
| try: |
| for file_path in paths: |
| new_identities, new_certs = _load_items_from_file(keychain, file_path) |
| identities.extend(new_identities) |
| certificates.extend(new_certs) |
| |
| # Ok, we have everything. The question is: do we have an identity? If |
| # not, we want to grab one from the first cert we have. |
| if not identities: |
| new_identity = Security.SecIdentityRef() |
| status = Security.SecIdentityCreateWithCertificate( |
| keychain, certificates[0], ctypes.byref(new_identity) |
| ) |
| _assert_no_error(status) |
| identities.append(new_identity) |
| |
| # We now want to release the original certificate, as we no longer |
| # need it. |
| CoreFoundation.CFRelease(certificates.pop(0)) |
| |
| # We now need to build a new CFArray that holds the trust chain. |
| trust_chain = CoreFoundation.CFArrayCreateMutable( |
| CoreFoundation.kCFAllocatorDefault, |
| 0, |
| ctypes.byref(CoreFoundation.kCFTypeArrayCallBacks), |
| ) |
| for item in itertools.chain(identities, certificates): |
| # ArrayAppendValue does a CFRetain on the item. That's fine, |
| # because the finally block will release our other refs to them. |
| CoreFoundation.CFArrayAppendValue(trust_chain, item) |
| |
| return trust_chain |
| finally: |
| for obj in itertools.chain(identities, certificates): |
| CoreFoundation.CFRelease(obj) |
| |
| |
| TLS_PROTOCOL_VERSIONS = { |
| "SSLv2": (0, 2), |
| "SSLv3": (3, 0), |
| "TLSv1": (3, 1), |
| "TLSv1.1": (3, 2), |
| "TLSv1.2": (3, 3), |
| } |
| |
| |
| def _build_tls_unknown_ca_alert(version): |
| """ |
| Builds a TLS alert record for an unknown CA. |
| """ |
| ver_maj, ver_min = TLS_PROTOCOL_VERSIONS[version] |
| severity_fatal = 0x02 |
| description_unknown_ca = 0x30 |
| msg = struct.pack(">BB", severity_fatal, description_unknown_ca) |
| msg_len = len(msg) |
| record_type_alert = 0x15 |
| record = struct.pack(">BBBH", record_type_alert, ver_maj, ver_min, msg_len) + msg |
| return record |