from __future__ import with_statement import random import re import socket import mock from nose.tools import eq_ from statsd import StatsClient from statsd import TCPStatsClient ADDR = (socket.gethostbyname('localhost'), 8125) # proto specific methods to get the socket method to send data send_method = { 'udp': lambda x: x.sendto, 'tcp': lambda x: x.sendall, } # proto specific methods to create the expected value make_val = { 'udp': lambda x, addr: mock.call(str.encode(x), addr), 'tcp': lambda x, addr: mock.call(str.encode(x + '\n')), } def _udp_client(prefix=None, addr=None, port=None, ipv6=False): if not addr: addr = ADDR[0] if not port: port = ADDR[1] sc = StatsClient(host=addr, port=port, prefix=prefix, ipv6=ipv6) sc._sock = mock.Mock() return sc def _tcp_client(prefix=None, addr=None, port=None, timeout=None, ipv6=False): if not addr: addr = ADDR[0] if not port: port = ADDR[1] sc = TCPStatsClient(host=addr, port=port, prefix=prefix, timeout=timeout, ipv6=ipv6) sc._sock = mock.Mock() return sc def _timer_check(sock, count, proto, start, end): send = send_method[proto](sock) eq_(send.call_count, count) value = send.call_args[0][0].decode('ascii') exp = re.compile('^%s:\d+|%s$' % (start, end)) assert exp.match(value) def _sock_check(sock, count, proto, val=None, addr=None): send = send_method[proto](sock) eq_(send.call_count, count) if not addr: addr = ADDR if val is not None: eq_( send.call_args, make_val[proto](val, addr), ) class assert_raises(object): """A context manager that asserts a given exception was raised. >>> with assert_raises(TypeError): ... raise TypeError >>> with assert_raises(TypeError): ... raise ValueError AssertionError: ValueError not in ['TypeError'] >>> with assert_raises(TypeError): ... pass AssertionError: No exception raised. Or you can specify any of a number of exceptions: >>> with assert_raises(TypeError, ValueError): ... raise ValueError >>> with assert_raises(TypeError, ValueError): ... raise KeyError AssertionError: KeyError not in ['TypeError', 'ValueError'] You can also get the exception back later: >>> with assert_raises(TypeError) as cm: ... raise TypeError('bad type!') >>> cm.exception TypeError('bad type!') >>> cm.exc_type TypeError >>> cm.traceback Lowercase name because that it's a class is an implementation detail. """ def __init__(self, *exc_cls): self.exc_cls = exc_cls def __enter__(self): # For access to the exception later. return self def __exit__(self, typ, value, tb): assert typ, 'No exception raised.' assert typ in self.exc_cls, '%s not in %s' % ( typ.__name__, [e.__name__ for e in self.exc_cls]) self.exc_type = typ self.exception = value self.traceback = tb # Swallow expected exceptions. return True def _test_incr(cl, proto): cl.incr('foo') _sock_check(cl._sock, 1, proto, val='foo:1|c') cl.incr('foo', 10) _sock_check(cl._sock, 2, proto, val='foo:10|c') cl.incr('foo', 1.2) _sock_check(cl._sock, 3, proto, val='foo:1.2|c') cl.incr('foo', 10, rate=0.5) _sock_check(cl._sock, 4, proto, val='foo:10|c|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_incr_udp(): """StatsClient.incr works.""" cl = _udp_client() _test_incr(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_incr_tcp(): """TCPStatsClient.incr works.""" cl = _tcp_client() _test_incr(cl, 'tcp') def _test_decr(cl, proto): cl.decr('foo') _sock_check(cl._sock, 1, proto, 'foo:-1|c') cl.decr('foo', 10) _sock_check(cl._sock, 2, proto, 'foo:-10|c') cl.decr('foo', 1.2) _sock_check(cl._sock, 3, proto, 'foo:-1.2|c') cl.decr('foo', 1, rate=0.5) _sock_check(cl._sock, 4, proto, 'foo:-1|c|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_decr_udp(): """StatsClient.decr works.""" cl = _udp_client() _test_decr(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_decr_tcp(): """TCPStatsClient.decr works.""" cl = _tcp_client() _test_decr(cl, 'tcp') def _test_gauge(cl, proto): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g') cl.gauge('foo', 1.2) _sock_check(cl._sock, 2, proto, 'foo:1.2|g') cl.gauge('foo', 70, rate=0.5) _sock_check(cl._sock, 3, proto, 'foo:70|g|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_udp(): """StatsClient.gauge works.""" cl = _udp_client() _test_gauge(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_tcp(): """TCPStatsClient.gauge works.""" cl = _tcp_client() _test_gauge(cl, 'tcp') def _test_ipv6(cl, proto, addr): cl.gauge('foo', 30) _sock_check(cl._sock, 1, proto, 'foo:30|g', addr=addr) def test_ipv6_udp(): """StatsClient can use to IPv6 address.""" addr = ('::1', 8125, 0, 0) cl = _udp_client(addr=addr[0], ipv6=True) _test_ipv6(cl, 'udp', addr) def test_ipv6_tcp(): """TCPStatsClient can use to IPv6 address.""" addr = ('::1', 8125, 0, 0) cl = _tcp_client(addr=addr[0], ipv6=True) _test_ipv6(cl, 'tcp', addr) def _test_resolution(cl, proto, addr): cl.incr('foo') _sock_check(cl._sock, 1, proto, 'foo:1|c', addr=addr) def test_ipv6_resolution_udp(): cl = _udp_client(addr='localhost', ipv6=True) _test_resolution(cl, 'udp', ('::1', 8125, 0, 0)) def test_ipv6_resolution_tcp(): cl = _tcp_client(addr='localhost', ipv6=True) _test_resolution(cl, 'tcp', ('::1', 8125, 0, 0)) def test_ipv4_resolution_udp(): cl = _udp_client(addr='localhost') _test_resolution(cl, 'udp', ('127.0.0.1', 8125)) def test_ipv4_resolution_tcp(): cl = _tcp_client(addr='localhost') _test_resolution(cl, 'tcp', ('127.0.0.1', 8125)) def _test_gauge_delta(cl, proto): tests = ( (12, '+12'), (-13, '-13'), (1.2, '+1.2'), (-1.3, '-1.3'), ) def _check(num, result): cl._sock.reset_mock() cl.gauge('foo', num, delta=True) _sock_check(cl._sock, 1, proto, 'foo:%s|g' % result) for num, result in tests: _check(num, result) @mock.patch.object(random, 'random', lambda: -1) def test_gauge_delta_udp(): """StatsClient.gauge works with delta values.""" cl = _udp_client() _test_gauge_delta(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_delta_tcp(): """TCPStatsClient.gauge works with delta values.""" cl = _tcp_client() _test_gauge_delta(cl, 'tcp') def _test_gauge_absolute_negative(cl, proto): cl.gauge('foo', -5, delta=False) _sock_check(cl._sock, 1, 'foo:0|g\nfoo:-5|g') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_absolute_negative_udp(): """StatsClient.gauge works with absolute negative value.""" cl = _udp_client() _test_gauge_delta(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_gauge_absolute_negative_tcp(): """TCPStatsClient.gauge works with absolute negative value.""" cl = _tcp_client() _test_gauge_delta(cl, 'tcp') def _test_gauge_absolute_negative_rate(cl, proto, mock_random): mock_random.return_value = -1 cl.gauge('foo', -1, rate=0.5, delta=False) _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g') mock_random.return_value = 2 cl.gauge('foo', -2, rate=0.5, delta=False) # Should not have changed. _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-1|g') @mock.patch.object(random, 'random') def test_gauge_absolute_negative_rate_udp(mock_random): """StatsClient.gauge works with absolute negative value and rate.""" cl = _udp_client() _test_gauge_absolute_negative_rate(cl, 'udp', mock_random) @mock.patch.object(random, 'random') def test_gauge_absolute_negative_rate_tcp(mock_random): """TCPStatsClient.gauge works with absolute negative value and rate.""" cl = _tcp_client() _test_gauge_absolute_negative_rate(cl, 'tcp', mock_random) def _test_set(cl, proto): cl.set('foo', 10) _sock_check(cl._sock, 1, proto, 'foo:10|s') cl.set('foo', 2.3) _sock_check(cl._sock, 2, proto, 'foo:2.3|s') cl.set('foo', 'bar') _sock_check(cl._sock, 3, proto, 'foo:bar|s') cl.set('foo', 2.3, 0.5) _sock_check(cl._sock, 4, proto, 'foo:2.3|s|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_set_udp(): """StatsClient.set works.""" cl = _udp_client() _test_set(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_set_tcp(): """TCPStatsClient.set works.""" cl = _tcp_client() _test_set(cl, 'tcp') def _test_timing(cl, proto): cl.timing('foo', 100) _sock_check(cl._sock, 1, proto, 'foo:100.000000|ms') cl.timing('foo', 350) _sock_check(cl._sock, 2, proto, 'foo:350.000000|ms') cl.timing('foo', 100, rate=0.5) _sock_check(cl._sock, 3, proto, 'foo:100.000000|ms|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_timing_udp(): """StatsClient.timing works.""" cl = _udp_client() _test_timing(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timing_tcp(): """TCPStatsClient.timing works.""" cl = _tcp_client() _test_timing(cl, 'tcp') def _test_prepare(cl, proto): tests = ( ('foo:1|c', ('foo', '1|c', 1)), ('bar:50|ms|@0.5', ('bar', '50|ms', 0.5)), ('baz:23|g', ('baz', '23|g', 1)), ) def _check(o, s, v, r): with mock.patch.object(random, 'random', lambda: -1): eq_(o, cl._prepare(s, v, r)) for o, (s, v, r) in tests: _check(o, s, v, r) @mock.patch.object(random, 'random', lambda: -1) def test_prepare_udp(): """Test StatsClient._prepare method.""" cl = _udp_client() _test_prepare(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_prepare_tcp(): """Test TCPStatsClient._prepare method.""" cl = _tcp_client() _test_prepare(cl, 'tcp') def _test_prefix(cl, proto): cl.incr('bar') _sock_check(cl._sock, 1, proto, 'foo.bar:1|c') @mock.patch.object(random, 'random', lambda: -1) def test_prefix_udp(): """StatsClient.incr works.""" cl = _udp_client(prefix='foo') _test_prefix(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_prefix_tcp(): """TCPStatsClient.incr works.""" cl = _tcp_client(prefix='foo') _test_prefix(cl, 'tcp') def _test_timer_manager(cl, proto): with cl.timer('foo'): pass _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_manager_udp(): """StatsClient.timer can be used as manager.""" cl = _udp_client() _test_timer_manager(cl, 'udp') def test_timer_manager_tcp(): """TCPStatsClient.timer can be used as manager.""" cl = _tcp_client() _test_timer_manager(cl, 'tcp') def _test_timer_decorator(cl, proto): @cl.timer('foo') def foo(a, b): return [a, b] @cl.timer('bar') def bar(a, b): return [b, a] # make sure it works with more than one decorator, called multiple # times, and that parameters are handled correctly eq_([4, 2], foo(4, 2)) _timer_check(cl._sock, 1, proto, 'foo', 'ms') eq_([2, 4], bar(4, 2)) _timer_check(cl._sock, 2, proto, 'bar', 'ms') eq_([6, 5], bar(5, 6)) _timer_check(cl._sock, 3, proto, 'bar', 'ms') def test_timer_decorator_udp(): """StatsClient.timer is a thread-safe decorator (UDP).""" cl = _udp_client() _test_timer_decorator(cl, 'udp') def test_timer_decorator_tcp(): """StatsClient.timer is a thread-safe decorator (TCP).""" cl = _tcp_client() _test_timer_decorator(cl, 'tcp') def _test_timer_capture(cl, proto): with cl.timer('woo') as result: eq_(result.ms, None) assert isinstance(result.ms, float) def test_timer_capture_udp(): """You can capture the output of StatsClient.timer (UDP).""" cl = _udp_client() _test_timer_capture(cl, 'udp') def test_timer_capture_tcp(): """You can capture the output of StatsClient.timer (TCP).""" cl = _tcp_client() _test_timer_capture(cl, 'tcp') def _test_timer_context_rate(cl, proto): with cl.timer('foo', rate=0.5): pass _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_timer_context_rate_udp(): """StatsClient.timer can be used as manager with rate.""" cl = _udp_client() _test_timer_context_rate(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timer_context_rate_tcp(): """TCPStatsClient.timer can be used as manager with rate.""" cl = _tcp_client() _test_timer_context_rate(cl, 'tcp') def _test_timer_decorator_rate(cl, proto): @cl.timer('foo', rate=0.1) def foo(a, b): return [b, a] @cl.timer('bar', rate=0.2) def bar(a, b=2, c=3): return [c, b, a] eq_([2, 4], foo(4, 2)) _timer_check(cl._sock, 1, proto, 'foo', 'ms|@0.1') eq_([3, 2, 5], bar(5)) _timer_check(cl._sock, 2, proto, 'bar', 'ms|@0.2') @mock.patch.object(random, 'random', lambda: -1) def test_timer_decorator_rate_udp(): """StatsClient.timer can be used as decorator with rate.""" cl = _udp_client() _test_timer_decorator_rate(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timer_decorator_rate_tcp(): """TCPStatsClient.timer can be used as decorator with rate.""" cl = _tcp_client() _test_timer_decorator_rate(cl, 'tcp') def _test_timer_context_exceptions(cl, proto): with assert_raises(socket.timeout): with cl.timer('foo'): raise socket.timeout() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_context_exceptions_udp(): cl = _udp_client() _test_timer_context_exceptions(cl, 'udp') def test_timer_context_exceptions_tcp(): cl = _tcp_client() _test_timer_context_exceptions(cl, 'tcp') def _test_timer_decorator_exceptions(cl, proto): @cl.timer('foo') def foo(): raise ValueError() with assert_raises(ValueError): foo() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_decorator_exceptions_udp(): cl = _udp_client() _test_timer_decorator_exceptions(cl, 'udp') def test_timer_decorator_exceptions_tcp(): cl = _tcp_client() _test_timer_decorator_exceptions(cl, 'tcp') def _test_timer_object(cl, proto): t = cl.timer('foo').start() t.stop() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_object_udp(): """StatsClient.timer works.""" cl = _udp_client() _test_timer_object(cl, 'udp') def test_timer_object_tcp(): """TCPStatsClient.timer works.""" cl = _tcp_client() _test_timer_object(cl, 'tcp') def _test_timer_object_no_send(cl, proto): t = cl.timer('foo').start() t.stop(send=False) _sock_check(cl._sock, 0, proto) t.send() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_timer_object_no_send_udp(): """Stop StatsClient.timer without sending.""" cl = _udp_client() _test_timer_object_no_send(cl, 'udp') def test_timer_object_no_send_tcp(): """Stop TCPStatsClient.timer without sending.""" cl = _tcp_client() _test_timer_object_no_send(cl, 'tcp') def _test_timer_object_rate(cl, proto): t = cl.timer('foo', rate=0.5) t.start() t.stop() _timer_check(cl._sock, 1, proto, 'foo', 'ms@0.5') @mock.patch.object(random, 'random', lambda: -1) def test_timer_object_rate_udp(): """StatsClient.timer works with rate.""" cl = _udp_client() _test_timer_object_rate(cl, 'udp') @mock.patch.object(random, 'random', lambda: -1) def test_timer_object_rate_tcp(): """TCPStatsClient.timer works with rate.""" cl = _tcp_client() _test_timer_object_rate(cl, 'tcp') def _test_timer_object_no_send_twice(cl): t = cl.timer('foo').start() t.stop() with assert_raises(RuntimeError): t.send() def test_timer_object_no_send_twice_udp(): """StatsClient.timer raises RuntimeError if send is called twice.""" cl = _udp_client() _test_timer_object_no_send_twice(cl) def test_timer_object_no_send_twice_tcp(): """TCPStatsClient.timer raises RuntimeError if send is called twice.""" cl = _tcp_client() _test_timer_object_no_send_twice(cl) def _test_timer_send_without_stop(cl): with cl.timer('foo') as t: assert t.ms is None with assert_raises(RuntimeError): t.send() t = cl.timer('bar').start() assert t.ms is None with assert_raises(RuntimeError): t.send() def test_timer_send_without_stop_udp(): """StatsClient.timer raises error if send is called before stop.""" cl = _udp_client() _test_timer_send_without_stop(cl) def test_timer_send_without_stop_tcp(): """TCPStatsClient.timer raises error if send is called before stop.""" cl = _tcp_client() _test_timer_send_without_stop(cl) def _test_timer_object_stop_without_start(cl): with assert_raises(RuntimeError): cl.timer('foo').stop() def test_timer_object_stop_without_start_udp(): """StatsClient.timer raises error if stop is called before start.""" cl = _udp_client() _test_timer_object_stop_without_start(cl) def test_timer_object_stop_without_start_tcp(): """TCPStatsClient.timer raises error if stop is called before start.""" cl = _tcp_client() _test_timer_object_stop_without_start(cl) def _test_pipeline(cl, proto): pipe = cl.pipeline() pipe.incr('foo') pipe.decr('bar') pipe.timing('baz', 320) pipe.send() _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:320.000000|ms') def test_pipeline_udp(): """StatsClient.pipeline works.""" cl = _udp_client() _test_pipeline(cl, 'udp') def test_pipeline_tcp(): """TCPStatsClient.pipeline works.""" cl = _tcp_client() _test_pipeline(cl, 'tcp') def _test_pipeline_null(cl, proto): pipe = cl.pipeline() pipe.send() _sock_check(cl._sock, 0, proto) def test_pipeline_null_udp(): """Ensure we don't error on an empty pipeline (UDP).""" cl = _udp_client() _test_pipeline_null(cl, 'udp') def test_pipeline_null_tcp(): """Ensure we don't error on an empty pipeline (TCP).""" cl = _tcp_client() _test_pipeline_null(cl, 'tcp') def _test_pipeline_manager(cl, proto): with cl.pipeline() as pipe: pipe.incr('foo') pipe.decr('bar') pipe.gauge('baz', 15) _sock_check(cl._sock, 1, proto, 'foo:1|c\nbar:-1|c\nbaz:15|g') def test_pipeline_manager_udp(): """StatsClient.pipeline can be used as manager.""" cl = _udp_client() _test_pipeline_manager(cl, 'udp') def test_pipeline_manager_tcp(): """TCPStatsClient.pipeline can be used as manager.""" cl = _tcp_client() _test_pipeline_manager(cl, 'tcp') def _test_pipeline_timer_manager(cl, proto): with cl.pipeline() as pipe: with pipe.timer('foo'): pass _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_pipeline_timer_manager_udp(): """Timer manager can be retrieve from UDP Pipeline manager.""" cl = _udp_client() _test_pipeline_timer_manager(cl, 'udp') def test_pipeline_timer_manager_tcp(): """Timer manager can be retrieve from TCP Pipeline manager.""" cl = _tcp_client() _test_pipeline_timer_manager(cl, 'tcp') def _test_pipeline_timer_decorator(cl, proto): with cl.pipeline() as pipe: @pipe.timer('foo') def foo(): pass foo() _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_pipeline_timer_decorator_udp(): """UDP Pipeline manager can be used as decorator.""" cl = _udp_client() _test_pipeline_timer_decorator(cl, 'udp') def test_pipeline_timer_decorator_tcp(): """TCP Pipeline manager can be used as decorator.""" cl = _tcp_client() _test_pipeline_timer_decorator(cl, 'tcp') def _test_pipeline_timer_object(cl, proto): with cl.pipeline() as pipe: t = pipe.timer('foo').start() t.stop() _sock_check(cl._sock, 0, proto) _timer_check(cl._sock, 1, proto, 'foo', 'ms') def test_pipeline_timer_object_udp(): """Timer from UDP Pipeline manager works.""" cl = _udp_client() _test_pipeline_timer_object(cl, 'udp') def test_pipeline_timer_object_tcp(): """Timer from TCP Pipeline manager works.""" cl = _tcp_client() _test_pipeline_timer_object(cl, 'tcp') def _test_pipeline_empty(cl): with cl.pipeline() as pipe: pipe.incr('foo') eq_(1, len(pipe._stats)) eq_(0, len(pipe._stats)) def test_pipeline_empty_udp(): """Pipelines should be empty after a send() call (UDP).""" cl = _udp_client() _test_pipeline_empty(cl) def test_pipeline_empty_tcp(): """Pipelines should be empty after a send() call (TCP).""" cl = _tcp_client() _test_pipeline_empty(cl) def _test_pipeline_negative_absolute_gauge(cl, proto): with cl.pipeline() as pipe: pipe.gauge('foo', -10, delta=False) pipe.incr('bar') _sock_check(cl._sock, 1, proto, 'foo:0|g\nfoo:-10|g\nbar:1|c') def test_pipeline_negative_absolute_gauge_udp(): """Negative absolute gauges use an internal pipeline (UDP).""" cl = _udp_client() _test_pipeline_negative_absolute_gauge(cl, 'udp') def test_pipeline_negative_absolute_gauge_tcp(): """Negative absolute gauges use an internal pipeline (TCP).""" cl = _tcp_client() _test_pipeline_negative_absolute_gauge(cl, 'tcp') def _test_big_numbers(cl, proto): num = 1234568901234 tests = ( # Explicitly create strings so we avoid the bug we're trying to test. ('gauge', 'foo:1234568901234|g'), ('incr', 'foo:1234568901234|c'), ('timing', 'foo:1234568901234.000000|ms'), ) def _check(method, result): cl._sock.reset_mock() getattr(cl, method)('foo', num) _sock_check(cl._sock, 1, proto, result) for method, result in tests: _check(method, result) def test_big_numbers_udp(): """Test big numbers with UDP client.""" cl = _udp_client() _test_big_numbers(cl, 'udp') def test_big_numbers_tcp(): """Test big numbers with TCP client.""" cl = _tcp_client() _test_big_numbers(cl, 'tcp') def _test_rate_no_send(cl, proto): cl.incr('foo', rate=0.5) _sock_check(cl._sock, 0, proto) @mock.patch.object(random, 'random', lambda: 2) def test_rate_no_send_udp(): """Rate below random value prevents sending with StatsClient.incr.""" cl = _udp_client() _test_rate_no_send(cl, 'udp') @mock.patch.object(random, 'random', lambda: 2) def test_rate_no_send_tcp(): """Rate below random value prevents sending with TCPStatsClient.incr.""" cl = _tcp_client() _test_rate_no_send(cl, 'tcp') def test_socket_error(): """Socket error on StatsClient should be ignored.""" cl = _udp_client() cl._sock.sendto.side_effect = socket.timeout() cl.incr('foo') _sock_check(cl._sock, 1, 'udp', 'foo:1|c') def test_pipeline_packet_size(): """Pipelines shouldn't send packets larger than 512 bytes (UDP only).""" sc = _udp_client() pipe = sc.pipeline() for x in range(32): # 32 * 16 = 512, so this will need 2 packets. pipe.incr('sixteen_char_str') pipe.send() eq_(2, sc._sock.sendto.call_count) assert len(sc._sock.sendto.call_args_list[0][0][0]) <= 512 assert len(sc._sock.sendto.call_args_list[1][0][0]) <= 512 @mock.patch.object(socket, 'socket') def test_tcp_raises_exception_to_user(mock_socket): """Socket errors in TCPStatsClient should be raised to user.""" addr = ('127.0.0.1', 1234) cl = _tcp_client(addr=addr[0], port=addr[1]) cl.incr('foo') cl._sock.sendall.assert_called_once() cl._sock.sendall.side_effect = socket.error with assert_raises(socket.error): cl.incr('foo') @mock.patch.object(socket, 'socket') def test_tcp_timeout(mock_socket): """Timeout on TCPStatsClient should be set on socket.""" test_timeout = 321 cl = TCPStatsClient(timeout=test_timeout) cl.incr('foo') cl._sock.settimeout.assert_called_once() cl._sock.settimeout.assert_called_with(test_timeout)