imip-agent

Annotated tests/test_handle.py

1065:2175787d1896
2016-03-04 Paul Boddie Merged changes from the default branch. freebusy-collections
paul@585 1
#!/usr/bin/env python
paul@585 2
paul@585 3
"""
paul@585 4
A handler to help with testing.
paul@585 5
paul@1060 6
Copyright (C) 2014, 2015, 2016 Paul Boddie <paul@boddie.org.uk>
paul@585 7
paul@585 8
This program is free software; you can redistribute it and/or modify it under
paul@585 9
the terms of the GNU General Public License as published by the Free Software
paul@585 10
Foundation; either version 3 of the License, or (at your option) any later
paul@585 11
version.
paul@585 12
paul@585 13
This program is distributed in the hope that it will be useful, but WITHOUT
paul@585 14
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
paul@585 15
FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
paul@585 16
details.
paul@585 17
paul@585 18
You should have received a copy of the GNU General Public License along with
paul@585 19
this program.  If not, see <http://www.gnu.org/licenses/>.
paul@585 20
"""
paul@585 21
paul@601 22
from imiptools.client import ClientForObject
paul@667 23
from imiptools.data import Object, get_address, parse_object
paul@800 24
from imiptools.dates import get_datetime, to_timezone
paul@585 25
from imiptools.mail import Messenger
paul@800 26
from imiptools.period import RecurringPeriod
paul@585 27
import imip_store
paul@585 28
import sys
paul@585 29
paul@601 30
class TestClient(ClientForObject):
paul@585 31
paul@585 32
    """
paul@585 33
    A content handler for use in testing, as opposed to operating within the
paul@585 34
    mail processing pipeline.
paul@585 35
    """
paul@585 36
paul@585 37
    # Action methods.
paul@585 38
paul@800 39
    def handle_request(self, action, start=None, end=None):
paul@585 40
paul@585 41
        """
paul@800 42
        Process the current request for the current user. Return whether the
paul@800 43
        given 'action' was taken.
paul@800 44
paul@800 45
        If 'start' and 'end' are specified, they will be used in any
paul@800 46
        counter-proposal.
paul@585 47
        """
paul@585 48
paul@585 49
        # Reply only on behalf of this user.
paul@585 50
paul@800 51
        if action in ("accept", "decline"):
paul@821 52
            attendee_attr = self.update_participation(action == "accept" and "ACCEPTED" or "DECLINED")
paul@800 53
            method = "REPLY"
paul@800 54
paul@800 55
        # For counter-proposals, set a new main period for the event.
paul@800 56
paul@800 57
        elif action == "counter":
paul@800 58
            attendee_attr = self.obj.get_value_map("ATTENDEE").get(self.user)
paul@800 59
            period = self.obj.get_main_period(self.get_tzid())
paul@800 60
paul@800 61
            # Use the existing or configured time zone for the specified
paul@800 62
            # datetimes.
paul@800 63
paul@800 64
            start = to_timezone(get_datetime(start), period.tzid)
paul@800 65
            end = to_timezone(get_datetime(end), period.tzid)
paul@800 66
            period = RecurringPeriod(start, end, period.tzid, period.origin, period.get_start_attr(), period.get_end_attr())
paul@800 67
            self.obj.set_period(period)
paul@800 68
            method = "COUNTER"
paul@800 69
        else:
paul@800 70
            return None
paul@585 71
paul@585 72
        if not attendee_attr:
paul@585 73
            return None
paul@585 74
paul@601 75
        # NOTE: This is a simpler form of the code in imipweb.client.
paul@585 76
paul@585 77
        organiser = get_address(self.obj.get_value("ORGANIZER"))
paul@585 78
paul@803 79
        self.update_sender(attendee_attr)
paul@585 80
        self.obj["ATTENDEE"] = [(self.user, attendee_attr)]
paul@585 81
        self.update_dtstamp()
paul@809 82
        self.update_sequence(False)
paul@585 83
paul@585 84
        message = self.messenger.make_outgoing_message(
paul@800 85
            [self.obj.to_part(method)],
paul@585 86
            [organiser],
paul@585 87
            outgoing_bcc=get_address(self.user)
paul@585 88
            )
paul@585 89
paul@585 90
        return message.as_string()
paul@585 91
paul@587 92
# A simple main program that attempts to handle a stored request, writing the
paul@585 93
# response message to standard output.
paul@585 94
paul@585 95
if __name__ == "__main__":
paul@585 96
    try:
paul@1060 97
        action, store_dir, journal_dir, preferences_dir, user = sys.argv[1:6]
paul@800 98
        if action == "counter":
paul@1060 99
            start, end = sys.argv[6:8]
paul@1060 100
            i = 8
paul@800 101
        else:
paul@800 102
            start, end = None, None
paul@1060 103
            i = 6
paul@800 104
        uid, recurrenceid = (sys.argv[i:i+2] + [None] * 2)[:2]
paul@585 105
    except ValueError:
paul@800 106
        print >>sys.stderr, """\
paul@1013 107
Need 'accept', 'counter' or 'decline', a store directory, a preferences
paul@1013 108
directory, user URI, any counter-proposal datetimes (see below), plus the
paul@1013 109
appropriate event UID and RECURRENCE-ID (if a recurrence is involved).
paul@800 110
paul@800 111
The RECURRENCE-ID must be in exactly the form employed by the store, not a
paul@800 112
different but equivalent representation.
paul@800 113
paul@800 114
Alternatively, omit the UID and RECURRENCE-ID and provide event-only details on
paul@800 115
standard input to force the script to handle an event not already present in the
paul@800 116
store.
paul@800 117
paul@800 118
If 'counter' has been indicated, alternative start and end datetimes are also
paul@800 119
required.
paul@800 120
"""
paul@585 121
        sys.exit(1)
paul@585 122
paul@585 123
    store = imip_store.FileStore(store_dir)
paul@1060 124
    journal = imip_store.FileJournal(journal_dir)
paul@667 125
paul@667 126
    if uid is not None:
paul@667 127
        fragment = store.get_event(user, uid, recurrenceid)
paul@585 128
paul@667 129
        if not fragment:
paul@667 130
            print >>sys.stderr, "No such event:", uid, recurrenceid
paul@667 131
            sys.exit(1)
paul@667 132
    else:
paul@667 133
        fragment = parse_object(sys.stdin, "utf-8")
paul@585 134
paul@585 135
    obj = Object(fragment)
paul@1060 136
    handler = TestClient(obj, user, Messenger(), store, None, journal, preferences_dir)
paul@800 137
    response = handler.handle_request(action, start, end)
paul@585 138
paul@585 139
    if response:
paul@667 140
        if uid is not None:
paul@667 141
            store.dequeue_request(user, uid, recurrenceid)
paul@585 142
        print response
paul@585 143
    else:
paul@585 144
        sys.exit(1)
paul@585 145
paul@585 146
# vim: tabstop=4 expandtab shiftwidth=4