-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_custom_payload.py
170 lines (128 loc) · 6.26 KB
/
test_custom_payload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import unittest2 as unittest
except ImportError:
import unittest
import six
from cassandra.query import (SimpleStatement, BatchStatement, BatchType)
from tests.integration import use_singledc, PROTOCOL_VERSION, local, TestCluster
def setup_module():
use_singledc()
#These test rely on the custom payload being returned but by default C*
#ignores all the payloads.
@local
class CustomPayloadTests(unittest.TestCase):
def setUp(self):
if PROTOCOL_VERSION < 4:
raise unittest.SkipTest(
"Native protocol 4,0+ is required for custom payloads, currently using %r"
% (PROTOCOL_VERSION,))
self.cluster = TestCluster()
self.session = self.cluster.connect()
def tearDown(self):
self.cluster.shutdown()
def test_custom_query_basic(self):
"""
Test to validate that custom payloads work with simple queries
creates a simple query and ensures that custom payloads are passed to C*. A custom
query provider is used with C* so we can validate that same custom payloads are sent back
with the results
@since 2.6
@jira_ticket PYTHON-280
@expected_result valid custom payloads should be sent and received
@test_category queries:custom_payload
"""
# Create a simple query statement a
query = "SELECT * FROM system.local"
statement = SimpleStatement(query)
# Validate that various types of custom payloads are sent and received okay
self.validate_various_custom_payloads(statement=statement)
def test_custom_query_batching(self):
"""
Test to validate that custom payloads work with batch queries
creates a batch query and ensures that custom payloads are passed to C*. A custom
query provider is used with C* so we can validate that same custom payloads are sent back
with the results
@since 2.6
@jira_ticket PYTHON-280
@expected_result valid custom payloads should be sent and received
@test_category queries:custom_payload
"""
# Construct Batch Statement
batch = BatchStatement(BatchType.LOGGED)
for i in range(10):
batch.add(SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (%s, %s)"), (i, i))
# Validate that various types of custom payloads are sent and received okay
self.validate_various_custom_payloads(statement=batch)
def test_custom_query_prepared(self):
"""
Test to validate that custom payloads work with prepared queries
creates a batch query and ensures that custom payloads are passed to C*. A custom
query provider is used with C* so we can validate that same custom payloads are sent back
with the results
@since 2.6
@jira_ticket PYTHON-280
@expected_result valid custom payloads should be sent and received
@test_category queries:custom_payload
"""
# Construct prepared statement
prepared = self.session.prepare(
"""
INSERT INTO test3rf.test (k, v) VALUES (?, ?)
""")
bound = prepared.bind((1, None))
# Validate that various custom payloads are validated correctly
self.validate_various_custom_payloads(statement=bound)
def validate_various_custom_payloads(self, statement):
"""
This is a utility method that given a statement will attempt
to submit the statement with various custom payloads. It will
validate that the custom payloads are sent and received correctly.
@param statement The statement to validate the custom queries in conjunction with
"""
# Simple key value
custom_payload = {'test': b'test_return'}
self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)
# no key value
custom_payload = {'': b''}
self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)
# Space value
custom_payload = {' ': b' '}
self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)
# Long key value pair
key_value = "x" * 10
custom_payload = {key_value: six.b(key_value)}
self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)
# Max supported value key pairs according C* binary protocol v4 should be 65534 (unsigned short max value)
for i in range(65534):
custom_payload[str(i)] = six.b('x')
self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)
# Add one custom payload to this is too many key value pairs and should fail
custom_payload[str(65535)] = six.b('x')
with self.assertRaises(ValueError):
self.execute_async_validate_custom_payload(statement=statement, custom_payload=custom_payload)
def execute_async_validate_custom_payload(self, statement, custom_payload):
"""
This is just a simple method that submits a statement with a payload, and validates
that the custom payload we submitted matches the one that we got back
@param statement The statement to execute
@param custom_payload The custom payload to submit with
"""
# Submit the statement with our custom payload. Validate the one
# we receive from the server matches
response_future = self.session.execute_async(statement, custom_payload=custom_payload)
response_future.result()
returned_custom_payload = response_future.custom_payload
self.assertEqual(custom_payload, returned_custom_payload)