import socket import pytest from urllib3.poolmanager import PoolKey, key_fn_by_scheme, PoolManager from urllib3 import connection_from_url from urllib3.exceptions import ClosedPoolError, LocationValueError from urllib3.util import retry, timeout from test import resolvesLocalhostFQDN class TestPoolManager(object): @resolvesLocalhostFQDN def test_same_url(self): # Convince ourselves that normally we don't get the same object conn1 = connection_from_url("http://localhost:8081/foo") conn2 = connection_from_url("http://localhost:8081/bar") assert conn1 != conn2 # Now try again using the PoolManager p = PoolManager(1) conn1 = p.connection_from_url("http://localhost:8081/foo") conn2 = p.connection_from_url("http://localhost:8081/bar") assert conn1 == conn2 # Ensure that FQDNs are handled separately from relative domains p = PoolManager(2) conn1 = p.connection_from_url("http://localhost.:8081/foo") conn2 = p.connection_from_url("http://localhost:8081/bar") assert conn1 != conn2 def test_many_urls(self): urls = [ "http://localhost:8081/foo", "http://www.google.com/mail", "http://localhost:8081/bar", "https://www.google.com/", "https://www.google.com/mail", "http://yahoo.com", "http://bing.com", "http://yahoo.com/", ] connections = set() p = PoolManager(10) for url in urls: conn = p.connection_from_url(url) connections.add(conn) assert len(connections) == 5 def test_manager_clear(self): p = PoolManager(5) conn_pool = p.connection_from_url("http://google.com") assert len(p.pools) == 1 conn = conn_pool._get_conn() p.clear() assert len(p.pools) == 0 with pytest.raises(ClosedPoolError): conn_pool._get_conn() conn_pool._put_conn(conn) with pytest.raises(ClosedPoolError): conn_pool._get_conn() assert len(p.pools) == 0 @pytest.mark.parametrize("url", ["http://@", None]) def test_nohost(self, url): p = PoolManager(5) with pytest.raises(LocationValueError): p.connection_from_url(url=url) def test_contextmanager(self): with PoolManager(1) as p: conn_pool = p.connection_from_url("http://google.com") assert len(p.pools) == 1 conn = conn_pool._get_conn() assert len(p.pools) == 0 with pytest.raises(ClosedPoolError): conn_pool._get_conn() conn_pool._put_conn(conn) with pytest.raises(ClosedPoolError): conn_pool._get_conn() assert len(p.pools) == 0 def test_http_pool_key_fields(self): """Assert the HTTPPoolKey fields are honored when selecting a pool.""" connection_pool_kw = { "timeout": timeout.Timeout(3.14), "retries": retry.Retry(total=6, connect=2), "block": True, "strict": True, "source_address": "127.0.0.1", } p = PoolManager() conn_pools = [ p.connection_from_url("http://example.com/"), p.connection_from_url("http://example.com:8000/"), p.connection_from_url("http://other.example.com/"), ] for key, value in connection_pool_kw.items(): p.connection_pool_kw[key] = value conn_pools.append(p.connection_from_url("http://example.com/")) assert all( x is not y for i, x in enumerate(conn_pools) for j, y in enumerate(conn_pools) if i != j ) assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_https_pool_key_fields(self): """Assert the HTTPSPoolKey fields are honored when selecting a pool.""" connection_pool_kw = { "timeout": timeout.Timeout(3.14), "retries": retry.Retry(total=6, connect=2), "block": True, "strict": True, "source_address": "127.0.0.1", "key_file": "/root/totally_legit.key", "cert_file": "/root/totally_legit.crt", "cert_reqs": "CERT_REQUIRED", "ca_certs": "/root/path_to_pem", "ssl_version": "SSLv23_METHOD", } p = PoolManager() conn_pools = [ p.connection_from_url("https://example.com/"), p.connection_from_url("https://example.com:4333/"), p.connection_from_url("https://other.example.com/"), ] # Asking for a connection pool with the same key should give us an # existing pool. dup_pools = [] for key, value in connection_pool_kw.items(): p.connection_pool_kw[key] = value conn_pools.append(p.connection_from_url("https://example.com/")) dup_pools.append(p.connection_from_url("https://example.com/")) assert all( x is not y for i, x in enumerate(conn_pools) for j, y in enumerate(conn_pools) if i != j ) assert all(pool in conn_pools for pool in dup_pools) assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_default_pool_key_funcs_copy(self): """Assert each PoolManager gets a copy of ``pool_keys_by_scheme``.""" p = PoolManager() assert p.key_fn_by_scheme == p.key_fn_by_scheme assert p.key_fn_by_scheme is not key_fn_by_scheme def test_pools_keyed_with_from_host(self): """Assert pools are still keyed correctly with connection_from_host.""" ssl_kw = { "key_file": "/root/totally_legit.key", "cert_file": "/root/totally_legit.crt", "cert_reqs": "CERT_REQUIRED", "ca_certs": "/root/path_to_pem", "ssl_version": "SSLv23_METHOD", } p = PoolManager(5, **ssl_kw) conns = [p.connection_from_host("example.com", 443, scheme="https")] for k in ssl_kw: p.connection_pool_kw[k] = "newval" conns.append(p.connection_from_host("example.com", 443, scheme="https")) assert all( x is not y for i, x in enumerate(conns) for j, y in enumerate(conns) if i != j ) def test_https_connection_from_url_case_insensitive(self): """Assert scheme case is ignored when pooling HTTPS connections.""" p = PoolManager() pool = p.connection_from_url("https://example.com/") other_pool = p.connection_from_url("HTTPS://EXAMPLE.COM/") assert 1 == len(p.pools) assert pool is other_pool assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_https_connection_from_host_case_insensitive(self): """Assert scheme case is ignored when getting the https key class.""" p = PoolManager() pool = p.connection_from_host("example.com", scheme="https") other_pool = p.connection_from_host("EXAMPLE.COM", scheme="HTTPS") assert 1 == len(p.pools) assert pool is other_pool assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_https_connection_from_context_case_insensitive(self): """Assert scheme case is ignored when getting the https key class.""" p = PoolManager() context = {"scheme": "https", "host": "example.com", "port": "443"} other_context = {"scheme": "HTTPS", "host": "EXAMPLE.COM", "port": "443"} pool = p.connection_from_context(context) other_pool = p.connection_from_context(other_context) assert 1 == len(p.pools) assert pool is other_pool assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_http_connection_from_url_case_insensitive(self): """Assert scheme case is ignored when pooling HTTP connections.""" p = PoolManager() pool = p.connection_from_url("http://example.com/") other_pool = p.connection_from_url("HTTP://EXAMPLE.COM/") assert 1 == len(p.pools) assert pool is other_pool assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_http_connection_from_host_case_insensitive(self): """Assert scheme case is ignored when getting the https key class.""" p = PoolManager() pool = p.connection_from_host("example.com", scheme="http") other_pool = p.connection_from_host("EXAMPLE.COM", scheme="HTTP") assert 1 == len(p.pools) assert pool is other_pool assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_assert_hostname_and_fingerprint_flag(self): """Assert that pool manager can accept hostname and fingerprint flags.""" fingerprint = "92:81:FE:85:F7:0C:26:60:EC:D6:B3:BF:93:CF:F9:71:CC:07:7D:0A" p = PoolManager(assert_hostname=True, assert_fingerprint=fingerprint) pool = p.connection_from_url("https://example.com/") assert 1 == len(p.pools) assert pool.assert_hostname assert fingerprint == pool.assert_fingerprint def test_http_connection_from_context_case_insensitive(self): """Assert scheme case is ignored when getting the https key class.""" p = PoolManager() context = {"scheme": "http", "host": "example.com", "port": "8080"} other_context = {"scheme": "HTTP", "host": "EXAMPLE.COM", "port": "8080"} pool = p.connection_from_context(context) other_pool = p.connection_from_context(other_context) assert 1 == len(p.pools) assert pool is other_pool assert all(isinstance(key, PoolKey) for key in p.pools.keys()) def test_custom_pool_key(self): """Assert it is possible to define a custom key function.""" p = PoolManager(10) p.key_fn_by_scheme["http"] = lambda x: tuple(x["key"]) pool1 = p.connection_from_url( "http://example.com", pool_kwargs={"key": "value"} ) pool2 = p.connection_from_url( "http://example.com", pool_kwargs={"key": "other"} ) pool3 = p.connection_from_url( "http://example.com", pool_kwargs={"key": "value", "x": "y"} ) assert 2 == len(p.pools) assert pool1 is pool3 assert pool1 is not pool2 def test_override_pool_kwargs_url(self): """Assert overriding pool kwargs works with connection_from_url.""" p = PoolManager(strict=True) pool_kwargs = {"strict": False, "retries": 100, "block": True} default_pool = p.connection_from_url("http://example.com/") override_pool = p.connection_from_url( "http://example.com/", pool_kwargs=pool_kwargs ) assert default_pool.strict assert retry.Retry.DEFAULT == default_pool.retries assert not default_pool.block assert not override_pool.strict assert 100 == override_pool.retries assert override_pool.block def test_override_pool_kwargs_host(self): """Assert overriding pool kwargs works with connection_from_host""" p = PoolManager(strict=True) pool_kwargs = {"strict": False, "retries": 100, "block": True} default_pool = p.connection_from_host("example.com", scheme="http") override_pool = p.connection_from_host( "example.com", scheme="http", pool_kwargs=pool_kwargs ) assert default_pool.strict assert retry.Retry.DEFAULT == default_pool.retries assert not default_pool.block assert not override_pool.strict assert 100 == override_pool.retries assert override_pool.block def test_pool_kwargs_socket_options(self): """Assert passing socket options works with connection_from_host""" p = PoolManager(socket_options=[]) override_opts = [ (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1), (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), ] pool_kwargs = {"socket_options": override_opts} default_pool = p.connection_from_host("example.com", scheme="http") override_pool = p.connection_from_host( "example.com", scheme="http", pool_kwargs=pool_kwargs ) assert default_pool.conn_kw["socket_options"] == [] assert override_pool.conn_kw["socket_options"] == override_opts def test_merge_pool_kwargs(self): """Assert _merge_pool_kwargs works in the happy case""" p = PoolManager(strict=True) merged = p._merge_pool_kwargs({"new_key": "value"}) assert {"strict": True, "new_key": "value"} == merged def test_merge_pool_kwargs_none(self): """Assert false-y values to _merge_pool_kwargs result in defaults""" p = PoolManager(strict=True) merged = p._merge_pool_kwargs({}) assert p.connection_pool_kw == merged merged = p._merge_pool_kwargs(None) assert p.connection_pool_kw == merged def test_merge_pool_kwargs_remove_key(self): """Assert keys can be removed with _merge_pool_kwargs""" p = PoolManager(strict=True) merged = p._merge_pool_kwargs({"strict": None}) assert "strict" not in merged def test_merge_pool_kwargs_invalid_key(self): """Assert removing invalid keys with _merge_pool_kwargs doesn't break""" p = PoolManager(strict=True) merged = p._merge_pool_kwargs({"invalid_key": None}) assert p.connection_pool_kw == merged