aboutsummaryrefslogtreecommitdiff
path: root/mm3mod/__init__.py
blob: f5f3b8633a5ca2128686a49be5a10ebc54e60366 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# Copyright (C) 2021 by David Bremner <david@tethera.net>
# Licensed under the LGPL v3 https://www.gnu.org/licenses/lgpl-3.0.html

import os
import cmd2
import configparser
from pathlib import PurePath
from mailmanclient.client import Client
from mailmanclient import Member

class ModShell(cmd2.Cmd):
    intro = 'Type help or ? for a list of commands'

    def __init__(self, client, mlist):
        super().__init__()
        self.client = client;
        self.mlist = mlist;
        self.prompt = '({:s}) '.format(mlist.list_name)
        for command in ['shell', 'load', 'pyscript', 'shortcuts', 'alias',
                        'edit', 'history', 'py', 'set', 'unalias']:
            method='do_{:s}'.format(command)
            delattr(cmd2.Cmd,method)

    def print_held(self):
        self.held_messages = self.mlist.get_held_page (50,1)
        print ()
        for msg in self.held_messages:
            print(msg.request_id, msg.subject, msg.sender)
        print ()

    def matching_request_ids(self,text):
        candidates = []
        prefix_len = len(text)

        for msg in self.held_messages:
            string =  str(msg.request_id)
            if string.startswith(text):
                candidates.append(string)
        return candidates

    def complete_accept(self, text, line, begidx, endidx):
        return self.matching_request_ids(text)

    def do_accept(self, arg):
        '''
        accept <number>

        Post held message to mailing list.
        '''
        try:
            n = int(arg)
            self.mlist.get_held_message (n).accept()
        except ValueError:
            print('Illegal request_id: {:s}'.format(arg))

        return False

    def complete_verify(self, text, line, begidx, endidx):
        return self.matching_request_ids(text)

    def do_verify(self, arg):
        '''
        Post held message, and accept further messages from the same
        sender.
        '''
        try:
            n = int(arg)
            msg=self.mlist.get_held_message (n)
            member=get_nonmember(self.mlist, msg.sender)
            member.moderation_action = 'accept'
            member.save()
            msg.accept()
        except ValueError:
            print('Illegal request_id: {:s}'.format(arg))

        return False

    def complete_discard(self, text, line, begidx, endidx):
        candidates = self.matching_request_ids(text)
        if "page".startswith(text):
            candidates.append('page')
        return candidates

    def do_discard(self, arg):
        '''
        discard <number> | page

        Discard held message(s) as spam.
        '''
        try:
            if arg == 'page':
                for msg in self.held_messages:
                    msg.discard()
            else:
                n = int(arg)
                self.mlist.get_held_message (n).discard()
        except ValueError:
            print('Illegal request_id: {:s}'.format(arg))

        return False

    def preloop(self):
        self.print_held()

    def postcmd(self, stop, line):
        if (not stop):
            self.print_held()
        return stop

# in later versions, this exists as a method of MailingList, but not in
# version 3.2.1

def get_nonmember(mlist, email):
        """Get a nonmember.

        :param address: The email address of the member for this list.
        :return: A member proxy object.
        """
        # In order to get the member object we query the REST API for
        # the member. Incase there is no matching subscription, an
        # HTTPError is returned instead.
        try:
            path = 'lists/{0}/nonmember/{1}'.format(mlist.list_id, email)
            response, content = mlist._connection.call(path)
            return Member(mlist._connection, content['self_link'], content)
        except HTTPError:
            raise ValueError('%s is not a known nonmember address of %s' %
                             (email, mlist.fqdn_listname))

def main ():

    home = os.environ.get('HOME')
    xdg_config_home = os.environ.get('XDG_CONFIG_HOME') or \
        os.path.join(home, '.config')

    config_path = PurePath (xdg_config_home, 'mm3mod', 'config.ini')
    config = configparser.ConfigParser ()

    with open(config_path) as f:
        config.read_file (f)

    baseurl = config.get('rest', 'baseurl', fallback="http://localhost:8001/3.1/")
    user = config.get('rest', 'user', fallback='restadmin')
    password = config.get('rest', 'password')
    client = Client (baseurl, name=user, password=password);
    list_name = config.get('list','name')

    mlist = client.get_list (list_name)
    ModShell(client, mlist).cmdloop()

if __name__ == "__main__":
    main()