1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
"""
Wrapper for the `mu` command line.
"""
import email.message
import email.policy
from email.parser import BytesParser
import subprocess
from subprocess import PIPE
import xml.dom.minidom
import xml.dom
from typing import (
Literal,
Optional,
Union,
)
parser = BytesParser(policy=email.policy.default)
def find_file(id: str) -> Optional[str]:
cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}',
'--fields', 'l'],
stdout=PIPE)
filename = cmd.stdout.decode('UTF-8').strip()
if cmd.returncode == 4:
return None
if cmd.returncode != 0:
raise MuError(cmd.returncode)
return filename
def get_mail(id: str) -> email.message.Message:
"""
Lookup email by Message-ID.
[Raises]
MuError
"""
with open(find_file(id), "rb") as f:
mail = parser.parse(f)
return mail
class MuError(Exception):
codes = {
1: 'General Error',
2: 'No Matches',
4: 'Database is corrupted'
}
def __init__(self, returncode: int):
self.returncode: int = returncode
self.msg: str = MuError.codes.get(returncode, 'Unknown Error')
def __repr__(self):
return f'MuError({self.returncode}, "{self.msg}")'
def __str__(self):
return repr(self)
Sortfield = Union[Literal['cc'],
Literal['c'],
Literal['bcc'],
Literal['h'],
Literal['date'],
Literal['d'],
Literal['from'],
Literal['f'],
Literal['maildir'],
Literal['m'],
Literal['msgid'],
Literal['i'],
Literal['prio'],
Literal['p'],
Literal['subject'],
Literal['s'],
Literal['to'],
Literal['t'],
Literal['list'],
Literal['v']]
def search(query: str,
sortfield: Optional[Sortfield] = 'subject',
reverse: bool = False) -> list[dict[str, str]]:
"""
[Parameters]
query - Search query as per mu-find(1).
sortfield - Field to sort the values by
reverse - If the sort should be reversed
[Returns]
>>> {'from': 'Hugo Hörnquist <hugo@example.com>',
'date': '1585678375',
'size': '377',
'msgid': 'SAMPLE-ID@localhost',
'path': '/home/hugo/.local/var/mail/INBOX/cur/filename',
'maildir': '/INBOX'
}
"""
if not query:
raise ValueError('Query required for mu_search')
cmdline = ['mu', 'find',
'--format=xml',
query]
if sortfield:
cmdline.extend(['--sortfield', sortfield])
if reverse:
cmdline.append('--reverse')
print(cmdline)
cmd = subprocess.run(cmdline, capture_output=True)
if cmd.returncode == 1:
raise MuError(cmd.returncode)
if cmd.returncode == 4:
# no matches
return []
if cmd.returncode != 0:
raise MuError(cmd.returncode)
dom = xml.dom.minidom.parseString(cmd.stdout.decode('UTF-8'))
message_list = []
messages = dom.childNodes[0]
assert messages.localName == 'messages'
for message in messages.childNodes:
msg_dict = {}
if message.nodeType != xml.dom.Node.ELEMENT_NODE:
continue
for kv in message.childNodes:
if kv.nodeType != xml.dom.Node.ELEMENT_NODE:
continue
msg_dict[kv.localName] = kv.childNodes[0].data
message_list.append(msg_dict)
return message_list
def info():
cmd = subprocess.Popen('mu info --nocolor'.split(' '),
stdout=subprocess.PIPE,
text=True)
out = {}
for line in cmd.stdout:
if not line:
continue
if line[0] == '+':
continue
key, *value = [s.strip() for s in line.split('|') if s and not s.isspace()]
out[key] = '|'.join(value)
return out
|