testgroup.delete()
request.addfinalizer(fin)
+
+def test_sync_repl_invalid_cookie(topology, request):
+ """Test sync_repl with invalid cookie
+
+ :id: 8fa4a8f8-acf4-42a5-90f1-6ba1d8080e46
+ :setup: install a standalone instance
+ :steps:
+ 1. reset instance to standard (no retroCL, no sync_repl, no dynamic plugin)
+ 2. Enable retroCL/content_sync
+ 3. Establish a sync_repl connection
+ 4. Tests servers results to search with invalid cookie
+ 5. Add/delete an user entry to check the server is up and running
+ :expectedresults:
+ 1. Should succeeds
+ 2. Should succeeds
+ 3. Should succeeds
+ 4. Should succeeds
+ 5. Should succeeds
+ """
+
+ # Reset the instance in a default config
+ # Disable content sync plugin
+ topology.standalone.restart()
+ topology.standalone.plugins.disable(name=PLUGIN_REPL_SYNC)
+
+ # Disable retro changelog
+ topology.standalone.plugins.disable(name=PLUGIN_RETRO_CHANGELOG)
+
+ # Disable dynamic plugins
+ topology.standalone.modify_s(DN_CONFIG, [(ldap.MOD_REPLACE, 'nsslapd-dynamic-plugins', b'off')])
+ topology.standalone.restart()
+
+ # Enable retro changelog
+ topology.standalone.plugins.enable(name=PLUGIN_RETRO_CHANGELOG)
+
+ # Enbale content sync plugin
+ topology.standalone.plugins.enable(name=PLUGIN_REPL_SYNC)
+ topology.standalone.restart()
+
+ # Setup the syncer
+ sync = ISyncRepl(topology.standalone)
+
+ # Test invalid cookies
+ cookies = ('#', '##', 'a#a#a', 'a#a#1', 'foo')
+ for invalid_cookie in cookies:
+ log.info('Testing cookie: %s' % invalid_cookie)
+ try:
+ ldap_search = sync.syncrepl_search(base=DEFAULT_SUFFIX,
+ scope=ldap.SCOPE_SUBTREE,
+ attrlist=['objectclass', 'cn', 'homedirectory', 'sn','uid'],
+ filterstr='(|(objectClass=groupofnames)(objectClass=person))',
+ mode='refreshOnly',
+ cookie=invalid_cookie)
+ poll_result = sync.syncrepl_poll(all=1)
+
+ log.fatal('Invalid cookie accepted!')
+ assert False
+ except Exception as e:
+ log.info('Invalid cookie correctly rejected: {}'.format(e.args[0]['info']))
+ pass
+
+ # check that the server is still up and running
+ users = UserAccounts(topology.standalone, DEFAULT_SUFFIX)
+ user = users.create_test_user(uid=1000)
+
+ # Success
+ log.info('Test complete')
+
+ def fin():
+ topology.standalone.restart()
+ try:
+ user.delete()
+ except:
+ pass
+
+ request.addfinalizer(fin)