Coverage for pyotrs/lib.py: 93%
717 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-06 19:25 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-06 19:25 +0000
1""" lib.py
3PyOTRS lib
5This code implements the PyOTRS library to provide access to the OTRS API (REST)
6"""
8import base64
9import datetime
10import json
11import logging
12import mimetypes
13import os
14import time
16import deprecation
17import requests
19from .version import __version__
21log = logging.getLogger(__name__)
23TICKET_CONNECTOR_CONFIG_DEFAULT = {
24 'Name': 'GenericTicketConnectorREST',
25 'Config': {
26 'SessionCreate': {'RequestMethod': 'POST',
27 'Route': '/Session',
28 'Result': 'SessionID'},
29 'AccessTokenCreate': {'RequestMethod': 'POST',
30 'Route': '/Session',
31 'Result': 'AccessToken'},
32 'SessionGet': {'RequestMethod': 'GET',
33 'Route': '/Session/:SessionID',
34 'Result': 'SessionData'},
35 'TicketCreate': {'RequestMethod': 'POST',
36 'Route': '/Ticket',
37 'Result': 'TicketID'},
38 'TicketGet': {'RequestMethod': 'GET',
39 'Route': '/Ticket/:TicketID',
40 'Result': 'Ticket'},
41 'TicketGetList': {'RequestMethod': 'GET',
42 'Route': '/TicketList',
43 'Result': 'Ticket'},
44 'TicketHistoryGet': {'RequestMethod': 'GET',
45 'Route': '/TicketHistory/:TicketID',
46 'Result': 'TicketHistory'},
47 'TicketSearch': {'RequestMethod': 'GET',
48 'Route': '/Ticket',
49 'Result': 'TicketID'},
50 'TicketUpdate': {'RequestMethod': 'PATCH',
51 'Route': '/Ticket/:TicketID',
52 'Result': 'TicketID'},
53 }
54}
56LINK_CONNECTOR_CONFIG_DEFAULT = {
57 'Name': 'GenericLinkConnectorREST',
58 'Config': {
59 'LinkAdd': {'RequestMethod': 'POST',
60 'Route': '/LinkAdd',
61 'Result': 'LinkAdd'},
62 'LinkDelete': {'RequestMethod': 'DELETE',
63 'Route': '/LinkDelete',
64 'Result': 'LinkDelete'},
65 'LinkDeleteAll': {'RequestMethod': 'DELETE',
66 'Route': '/LinkDeleteAll',
67 'Result': 'LinkDeleteAll'},
68 'LinkList': {'RequestMethod': 'GET',
69 'Route': '/LinkList',
70 'Result': 'LinkList'},
71 'PossibleLinkList': {'RequestMethod': 'GET',
72 'Route': '/PossibleLinkList',
73 'Result': 'PossibleLinkList'},
74 'PossibleObjectsList': {'RequestMethod': 'GET',
75 'Route': '/PossibleObjectsList',
76 'Result': 'PossibleObject'},
77 'PossibleTypesList': {'RequestMethod': 'GET',
78 'Route': '/PossibleTypesList',
79 'Result': 'PossibleType'},
80 }
81}
83# Check if OS is Linux or Windows (#47):
84OS_TYPE_WINDOWS = False
85try:
86 from platform import system
88 if 'Windows' in system():
89 OS_TYPE_WINDOWS = True
90 log.debug("OS Type: Windows")
91 else:
92 log.debug("OS Type: Linux")
93except: # noqa: E722
94 log.warning("Could not determine OS-Type (Linux/Windows). Using default: 'Linux'.")
97class PyOTRSError(Exception):
98 def __init__(self, message):
99 super().__init__(message)
100 self.message = message
103class ArgumentMissingError(PyOTRSError):
104 pass
107class ArgumentInvalidError(PyOTRSError):
108 pass
111class ResponseParseError(PyOTRSError):
112 pass
115class SessionCreateError(PyOTRSError):
116 pass
119class SessionNotCreated(PyOTRSError): # noqa: N818
120 pass
123class APIError(PyOTRSError):
124 pass
127class HTTPError(PyOTRSError):
128 pass
131class Article:
132 """PyOTRS Article class """
134 def __init__(self, dct):
135 fields = {}
136 for key, _value in dct.items():
137 fields.update({key: dct[key]})
139 try:
140 self.aid = int(fields.get("ArticleID"))
141 except TypeError:
142 self.aid = 0
144 self.fields = fields
146 self.attachments = self._parse_attachments()
147 self.fields.pop("Attachment", None)
149 self.dynamic_fields = self._parse_dynamic_fields()
150 self.fields.pop("DynamicField", None)
152 def __repr__(self):
153 if self.aid != 0:
154 _len = len(self.attachments)
155 if _len == 0:
156 return f"<ArticleID: {self.aid}>"
157 elif _len == 1:
158 return f"<ArticleID: {self.aid} (1 Attachment)>"
159 else:
160 return f"<ArticleID: {self.aid} ({_len} Attachments)>"
161 else:
162 return f"<{self.__class__.__name__}>"
164 def to_dct(self, attachments=True, attachment_cont=True, dynamic_fields=True):
165 """represent as nested dict compatible for OTRS
167 Args:
168 attachments (bool): if True will include, otherwise exclude:
169 "Attachment" (default: True)
170 attachment_cont (bool): if True will include, otherwise exclude:
171 "Attachment" > "Content" (default: True)
172 dynamic_fields (bool): if True will include, otherwise exclude:
173 "DynamicField" (default: True)
175 Returns:
176 **dict**: Article represented as dict for OTRS
178 """
179 dct = {}
181 if attachments:
182 if self.attachments:
183 dct.update({"Attachment": [x.to_dct(content=attachment_cont) for x in
184 self.attachments]})
186 if dynamic_fields:
187 if self.dynamic_fields:
188 dct.update({"DynamicField": [x.to_dct() for x in self.dynamic_fields]})
190 if self.fields:
191 dct.update(self.fields)
193 return dct
195 def _parse_attachments(self):
196 """parse Attachment from Ticket and return as **list** of **Attachment** objects"""
197 lst = self.fields.get("Attachment")
198 if lst:
199 return [Attachment(item) for item in lst]
200 else:
201 return []
203 def _parse_dynamic_fields(self):
204 """parse DynamicField from Ticket and return as **list** of **DynamicField** objects"""
205 lst = self.fields.get("DynamicField")
206 if lst:
207 return [DynamicField.from_dct(item) for item in lst]
208 else:
209 return []
211 def attachment_get(self, a_filename):
212 """attachment_get
214 Args:
215 a_filename (str): Filename of Attachment to retrieve
217 Returns:
218 **Attachment** or **None**
220 """
221 result = [x for x in self.attachments if x.Filename == f"{a_filename}"]
222 if result:
223 return result[0]
224 else:
225 return None
227 def dynamic_field_get(self, df_name):
228 """dynamic_field_get
230 Args:
231 df_name (str): Name of DynamicField to retrieve
233 Returns:
234 **DynamicField** or **None**
236 """
238 result = [x for x in self.dynamic_fields if x.name == f"{df_name}"]
239 if result:
240 return result[0]
241 else:
242 return None
244 def field_get(self, f_name):
245 return self.fields.get(f_name)
247 def validate(self, validation_map=None):
248 """validate data against a mapping dict - if a key is not present
249 then set it with a default value according to dict
251 Args:
252 validation_map (dict): A mapping for all Article fields that have to be set. During
253 validation every required field that is not set will be set to a default value
254 specified in this dict.
256 .. note::
257 There is also a blacklist (fields to be removed) but this is currently
258 hardcoded to *dynamic_fields* and *attachments*.
260 """
261 if not validation_map:
262 validation_map = {"Body": "API created Article Body",
263 "Charset": "UTF8",
264 "MimeType": "text/plain",
265 "Subject": "API created Article",
266 "TimeUnit": 0}
268 for key, value in validation_map.items():
269 if not self.fields.get(key, None):
270 self.fields.update({key: value})
272 @classmethod
273 def _dummy(cls):
274 """dummy data (for testing)
276 Returns:
277 **Article**: An Article object.
279 """
280 return Article({"Subject": "Dümmy Subject",
281 "Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
282 "TimeUnit": 0,
283 "MimeType": "text/plain",
284 "Charset": "UTF8"})
286 @classmethod
287 def _dummy_force_notify(cls):
288 """dummy data (for testing)
290 Returns:
291 **Article**: An Article object.
293 """
294 return Article({"Subject": "Dümmy Subject",
295 "Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
296 "TimeUnit": 0,
297 "MimeType": "text/plain",
298 "Charset": "UTF8",
299 "ForceNotificationToUserID": [1, 2]})
302class Attachment:
303 """PyOTRS Attachment class """
305 def __init__(self, dct):
306 self.__dict__ = dct
308 def __repr__(self):
309 if hasattr(self, 'Filename'):
310 return f"<{self.__class__.__name__}: {self.Filename}>"
311 else:
312 return f"<{self.__class__.__name__}>"
314 def to_dct(self, content=True):
315 """represent Attachment object as dict
316 Args:
317 content (bool): if True will include, otherwise exclude: "Content" (default: True)
319 Returns:
320 **dict**: Attachment represented as dict.
322 """
323 dct = self.__dict__
324 if content:
325 return dct
326 else:
327 dct.pop("Content")
328 return dct
330 @classmethod
331 def create_basic(cls, Content=None, ContentType=None, Filename=None): # noqa: N803
332 """create a basic Attachment object
334 Args:
335 Content (str): base64 encoded content
336 ContentType (str): MIME type of content (e.g. text/plain)
337 Filename (str): file name (e.g. file.txt)
340 Returns:
341 **Attachment**: An Attachment object.
343 """
344 return Attachment({'Content': Content,
345 'ContentType': ContentType,
346 'Filename': Filename})
348 @classmethod
349 def create_from_file(cls, file_path):
350 """save Attachment to a folder on disc
352 Args:
353 file_path (str): The full path to the file from which an Attachment should be created.
355 Returns:
356 **Attachment**: An Attachment object.
358 """
359 with open(file_path, 'rb') as f:
360 content = f.read()
362 content_type = mimetypes.guess_type(file_path)[0]
363 if not content_type:
364 content_type = "application/octet-stream"
365 return Attachment({'Content': base64.b64encode(content).decode('utf-8'),
366 'ContentType': content_type,
367 'Filename': os.path.basename(file_path)})
369 def save_to_dir(self, folder="/tmp"):
370 """save Attachment to a folder on disc
372 Args:
373 folder (str): The directory where this attachment should be saved to.
375 Returns:
376 **bool**: True
378 """
379 if not hasattr(self, 'Content') or not hasattr(self, 'Filename'):
380 raise ValueError("invalid Attachment")
382 file_path = os.path.join(os.path.abspath(folder), self.Filename)
383 with open(file_path, 'wb') as f:
384 f.write(base64.b64decode(self.Content))
386 return True
388 @classmethod
389 def _dummy(cls):
390 """dummy data (for testing)
392 Returns:
393 **Attachment**: An Attachment object.
395 """
396 return Attachment.create_basic("YmFyCg==", "text/plain", "dümmy.txt")
399class DynamicField:
400 """PyOTRS DynamicField class
402 Args:
403 name (str): Name of OTRS DynamicField (required)
404 value (str): Value of OTRS DynamicField
405 search_operator (str): Search operator (defaults to: "Equals")
406 Valid options are:
407 "Equals", "Like", "GreaterThan", "GreaterThanEquals",
408 "SmallerThan", "SmallerThanEquals"
409 search_patterns (list): List of patterns (str or datetime) to search for
411 .. warning::
412 **PyOTRS only supports OTRS 5 style!**
413 DynamicField representation changed between OTRS 4 and OTRS 5.
415 """
417 SEARCH_OPERATORS = ("Equals", "Like", "GreaterThan", "GreaterThanEquals",
418 "SmallerThan", "SmallerThanEquals",)
420 def __init__(self, name, value=None, search_patterns=None, search_operator="Equals"):
421 self.name = name
422 self.value = value
424 if not isinstance(search_patterns, list):
425 self.search_patterns = [search_patterns]
426 else:
427 self.search_patterns = search_patterns
429 if search_operator not in DynamicField.SEARCH_OPERATORS:
430 raise NotImplementedError(f"Invalid Operator: \"{search_operator}\"")
431 self.search_operator = search_operator
433 def __repr__(self):
434 return f"<{self.__class__.__name__}: {self.name}: {self.value}>"
436 @classmethod
437 def from_dct(cls, dct):
438 """create DynamicField from dct
440 Args:
441 dct (dict):
443 Returns:
444 **DynamicField**: A DynamicField object.
446 """
447 return cls(name=dct["Name"], value=dct["Value"])
449 def to_dct(self):
450 """represent DynamicField as dict
452 Returns:
453 **dict**: DynamicField as dict.
455 """
456 return {"Name": self.name, "Value": self.value}
458 def to_dct_search(self):
459 """represent DynamicField as dict for search operations
461 Returns:
462 **dict**: DynamicField as dict for search operations
464 """
465 _lst = []
466 for item in self.search_patterns:
467 if isinstance(item, datetime.datetime):
468 item = item.strftime("%Y-%m-%d %H:%M:%S")
469 if self.search_operator == 'Like' and '*' not in item:
470 item = f"*{item}*"
471 _lst.append(item)
473 return {f"DynamicField_{self.name}": {self.search_operator: _lst}}
475 @classmethod
476 def _dummy1(cls):
477 """dummy1 data (for testing)
479 Returns:
480 **DynamicField**: A list of DynamicField objects.
482 """
483 return DynamicField(name="firstname", value="Jane")
485 @classmethod
486 def _dummy2(cls):
487 """dummy2 data (for testing)
489 Returns:
490 **DynamicField**: A list of DynamicField objects.
492 """
493 return DynamicField.from_dct({'Name': 'lastname', 'Value': 'Doe'})
496class Ticket:
497 """PyOTRS Ticket class
499 Args:
500 tid (int): OTRS Ticket ID as integer
501 fields (dict): OTRS Top Level fields
502 articles (list): List of Article objects
503 dynamic_fields (list): List of DynamicField objects
505 """
507 def __init__(self, dct):
508 # store OTRS Top Level fields
509 self.fields = {}
510 self.fields.update(dct)
512 self.tid = int(self.fields.get("TicketID", 0))
513 self.articles = self._parse_articles()
514 self.fields.pop("Article", None)
516 self.dynamic_fields = self._parse_dynamic_fields()
517 self.fields.pop("DynamicField", None)
519 def __repr__(self):
520 if self.tid:
521 return f"<{self.__class__.__name__}: {self.tid}>"
522 else:
523 return f"<{self.__class__.__name__}>"
525 def _parse_articles(self):
526 """parse Article from Ticket and return as **list** of **Article** objects"""
527 lst = self.fields.get("Article", [])
528 return [Article(item) for item in lst]
530 def _parse_dynamic_fields(self):
531 """parse DynamicField from Ticket and return as **list** of **DynamicField** objects"""
532 lst = self.fields.get("DynamicField", [])
533 return [DynamicField.from_dct(item) for item in lst]
535 def to_dct(self,
536 articles=True,
537 article_attachments=True,
538 article_attachment_cont=True,
539 article_dynamic_fields=True,
540 dynamic_fields=True):
541 """represent as nested dict
543 Args:
544 articles (bool): if True will include, otherwise exclude:
545 "Article" (default: True)
546 article_attachments (bool): if True will include, otherwise exclude:
547 "Article" > "Attachment" (default: True)
548 article_attachment_cont (bool): if True will include, otherwise exclude:
549 "Article" > "Attachment" > "Content" (default: True)
550 article_dynamic_fields (bool): if True will include, otherwise exclude:
551 "Article" > "DynamicField" (default: True)
552 dynamic_fields (bool): if True will include, otherwise exclude:
553 "DynamicField" (default: True)
555 Returns:
556 **dict**: Ticket represented as dict.
558 .. note::
559 Does not contain Articles or DynamicFields (currently)
561 """
562 dct = {}
563 dct.update(self.fields)
565 if articles:
566 try:
567 if self.articles:
568 dct.update({"Article": [x.to_dct(attachments=article_attachments,
569 attachment_cont=article_attachment_cont,
570 dynamic_fields=article_dynamic_fields)
571 for x in self.articles]})
572 except AttributeError:
573 pass
575 if dynamic_fields:
576 try:
577 if self.dynamic_fields:
578 dct.update({"DynamicField": [x.to_dct() for x in self.dynamic_fields]})
579 except AttributeError:
580 pass
582 return {"Ticket": dct}
584 def article_get(self, aid):
585 """article_get
587 Args:
588 aid (str): Article ID as either int or str
590 Returns:
591 **Article** or **None**
593 """
594 result = [x for x in self.articles if x.field_get("ArticleID") == str(aid)]
595 return result[0] if result else None
597 def dynamic_field_get(self, df_name):
598 """dynamic_field_get
600 Args:
601 df_name (str): Name of DynamicField to retrieve
603 Returns:
604 **DynamicField** or **None**
606 """
607 result = [x for x in self.dynamic_fields if x.name == df_name]
608 return result[0] if result else None
610 def field_get(self, f_name):
611 return self.fields.get(f_name)
613 def field_add(self, fields=None, **kwargs):
614 """add fields to ticket
616 Provide a dictionary of key:value pairs to be added to the ticket,
617 e.g. the basic ticket.
619 Use it like `field_add({"SLA": "1h", "Service": "Ticket-Service"})` or via kwargs
620 by providing key=value pairs: `field_add(SLA="1h", Service="Ticket-Service")`.
622 Args:
623 fields (dict): Dictionary of new fields to be added to the
624 Ticket instance.
626 Returns:
627 **dict**: The current fields attribute of the ticket.
629 """
630 if isinstance(fields, dict):
631 self.fields.update(fields)
633 if kwargs and isinstance(kwargs, dict):
634 self.fields.update(kwargs)
636 return self.fields
638 @classmethod
639 def create_basic(cls,
640 Title=None, # noqa: N803
641 QueueID=None, # noqa: N803
642 Queue=None, # noqa: N803
643 TypeID=None, # noqa: N803
644 Type=None, # noqa: N803
645 StateID=None, # noqa: N803
646 State=None, # noqa: N803
647 PriorityID=None, # noqa: N803
648 Priority=None, # noqa: N803
649 CustomerUser=None): # noqa: N803
650 """create basic ticket
652 Args:
653 Title (str): OTRS Ticket Title
654 QueueID (str): OTRS Ticket QueueID (e.g. "1")
655 Queue (str): OTRS Ticket Queue (e.g. "raw")
656 TypeID (str): OTRS Ticket TypeID (e.g. "1")
657 Type (str): OTRS Ticket Type (e.g. "Problem")
658 StateID (str): OTRS Ticket StateID (e.g. "1")
659 State (str): OTRS Ticket State (e.g. "open" or "new")
660 PriorityID (str): OTRS Ticket PriorityID (e.g. "1")
661 Priority (str): OTRS Ticket Priority (e.g. "low")
662 CustomerUser (str): OTRS Ticket CustomerUser
664 Returns:
665 **Ticket**: A new Ticket object.
667 """
668 if not Title:
669 raise ArgumentMissingError("Title is required")
671 if not Queue and not QueueID:
672 raise ArgumentMissingError("Either Queue or QueueID required")
674 if not State and not StateID:
675 raise ArgumentMissingError("Either State or StateID required")
677 if not Priority and not PriorityID:
678 raise ArgumentMissingError("Either Priority or PriorityID required")
680 if not CustomerUser:
681 raise ArgumentMissingError("CustomerUser is required")
683 if Type and TypeID:
684 raise ArgumentInvalidError("Either Type or TypeID - not both")
686 dct = {"Title": Title}
688 if Queue:
689 dct.update({"Queue": Queue})
690 else:
691 dct.update({"QueueID": QueueID})
693 if Type:
694 dct.update({"Type": Type})
695 if TypeID:
696 dct.update({"TypeID": TypeID})
698 if State:
699 dct.update({"State": State})
700 else:
701 dct.update({"StateID": StateID})
703 if Priority:
704 dct.update({"Priority": Priority})
705 else:
706 dct.update({"PriorityID": PriorityID})
708 dct.update({"CustomerUser": CustomerUser})
710 for key, value in dct.items():
711 dct.update({key: value})
713 return Ticket(dct)
715 @classmethod
716 def _dummy(cls):
717 """dummy data (for testing)
719 Returns:
720 **Ticket**: A Ticket object.
722 """
723 return Ticket.create_basic(Queue="Raw",
724 State="open",
725 Priority="3 normal",
726 CustomerUser="root@localhost",
727 Title="Bäsic Ticket")
729 @staticmethod
730 def datetime_to_pending_time_text(datetime_object=None):
731 """datetime_to_pending_time_text
733 Args:
734 datetime_object (Datetime)
736 Returns:
737 **str**: The pending time in the format required for OTRS REST interface.
739 """
740 return {
741 "Year": datetime_object.year,
742 "Month": datetime_object.month,
743 "Day": datetime_object.day,
744 "Hour": datetime_object.hour,
745 "Minute": datetime_object.minute
746 }
749class SessionStore:
750 """Session ID: persistently store to and retrieve from to file
752 Args:
753 file_path (str): Path on disc
754 session_timeout (int): OTRS Session Timeout Value (to avoid reusing outdated session id
755 value (str): A Session ID as str
756 created (int): seconds as epoch when a session_id record was created
757 expires (int): seconds as epoch when a session_id record expires
758 is_legacy (bool): whether the Session ID is for an older OTRS Version (<=7)
760 Raises:
761 ArgumentMissingError
763 """
765 def __init__(self, file_path=None, session_timeout=None,
766 value=None, created=None, expires=None, is_legacy=False):
767 if not file_path:
768 raise ArgumentMissingError("Argument file_path is required!")
770 if not session_timeout:
771 raise ArgumentMissingError("Argument session_timeout is required!")
773 self.file_path = file_path
774 self.timeout = session_timeout
775 self.value = value
776 self.created = created
777 self.expires = expires
778 self.is_legacy = is_legacy
780 def __repr__(self):
781 return f"<{self.__class__.__name__}: {self.file_path}>"
783 def read(self):
784 """Retrieve a stored Session ID from file
786 Returns:
787 **str** or **None**: Retrieved Session ID or None (if none could be read)
789 """
790 if not os.path.isfile(self.file_path):
791 return None
793 if not SessionStore._validate_file_owner_and_permissions(self.file_path):
794 return None
796 with open(self.file_path) as f:
797 content = f.read()
798 try:
799 data = json.loads(content)
800 self.value = data['session_id']
801 self.is_legacy = data.get('is_legacy', False)
803 self.created = datetime.datetime.utcfromtimestamp(int(data['created']))
804 self.expires = (self.created + datetime.timedelta(seconds=self.timeout))
806 if self.expires > datetime.datetime.utcnow():
807 return self.value # still valid
808 except ValueError:
809 return None
810 except KeyError:
811 return None
812 except Exception as err:
813 raise Exception(f"Exception Type: {type(err)}: {err}")
815 def write(self, new_value):
816 """Write and store a Session ID to file (rw for user only)
818 Args:
819 new_value (str): if none then empty value will be writen to file
820 Returns:
821 **bool**: **True** if successful, False **otherwise**.
823 """
824 self.value = new_value
826 if os.path.isfile(self.file_path):
827 if not SessionStore._validate_file_owner_and_permissions(self.file_path):
828 raise OSError("File exists but is not ok (wrong owner/permissions)!")
830 with open(self.file_path, 'w') as f:
831 f.write(json.dumps({'created': str(int(time.time())),
832 'session_id': self.value,
833 'is_legacy': self.is_legacy}))
834 os.chmod(self.file_path, 384) # 384 is '0600'
836 # TODO 2016-04-23 (RH): check this
837 if not SessionStore._validate_file_owner_and_permissions(self.file_path):
838 raise OSError("Race condition: Something happened to file during the run!")
840 return True
842 def delete(self):
843 """remove session id file (e.g. when it only contains an invalid session id)
845 Raises:
846 NotImplementedError
848 Returns:
849 **bool**: **True** if successful, otherwise **False**.
851 .. todo::
852 (RH) implement this _remove_session_id_file
853 """
854 raise NotImplementedError("Not yet done")
856 @staticmethod
857 def _validate_file_owner_and_permissions(full_file_path):
858 """validate SessionStore file ownership and permissions
860 Args:
861 full_file_path (str): full path to file on disc
863 Returns:
864 **bool**: **True** if valid and correct (or running on Windows), otherwise **False**...
866 """
867 if not os.path.isfile(full_file_path):
868 raise OSError(f"Does not exist or not a file: {full_file_path}")
870 if OS_TYPE_WINDOWS: # We have to do this, as os.getuid() is not defined if run on Windows
871 return True
873 file_lstat = os.lstat(full_file_path)
874 if not file_lstat.st_uid == os.getuid():
875 return False
877 if not file_lstat.st_mode & 0o777 == 384:
878 """ check for unix permission User R+W only (0600)
879 >>> oct(384)
880 '0600' Python 2
881 >>> oct(384)
882 '0o600' Python 3 """
883 return False
885 return True
888class Client:
889 """PyOTRS Client class - includes Session handling
891 Args:
892 baseurl (str): Base URL for OTRS System, no trailing slash e.g. http://otrs.example.com
893 username (str): Username
894 password (str): Password
895 session_id_file (str): Session ID path on disc, used to persistently store Session ID
896 session_timeout (int): Session Timeout configured in OTRS (usually 28800 seconds = 8h)
897 session_validation_ticket_id (int): Ticket ID of an existing ticket - used to perform
898 several check - e.g. validate log in (defaults to 1)
899 webservice_config_ticket (dict): OTRS REST Web Service Name - Ticket Connector
900 webservice_config_faq (dict): OTRS REST Web Service Name - FAQ Connector (FAQ code was
901 deprecated in 0.4.0 and removed in 1.0.0 - parameter stays for compatibility)
902 webservice_config_link (dict): OTRS REST Web Service Name - Link Connector
903 proxies (dict): Proxy settings - refer to requests docs for
904 more information - default to no proxies
905 https_verify (bool): Should HTTPS certificates be verified (defaults to True)
906 ca_cert_bundle (str): file path - if specified overrides python/system default for
907 Root CA bundle that will be used.
908 auth (tuple): e.g. ("user", "pass") - see requests documentation ("auth") for details
909 client_auth_cert (str): file path containing both certificate and key (unencrypted) in
910 PEM format to use for TLS client authentication (passed to requests as "cert")
911 customer_user (bool): flag to indicate that the username is for a "CustomerUser"
912 (defaults to False)
913 user_agent (str): optional HTTP UserAgent string
914 webservice_path (str): OTRS REST Web Service Path part - defaults to
915 "/otrs/nph-genericinterface.pl/Webservice/"
916 use_legacy_sessions (bool): if True, use sessions compatibility for OTRS < V8
917 request_timeout (float or tuple): optional How many seconds to wait for the server
918 to send data before giving up, as a float, or a (connect timeout, read timeout) tuple
920 """
922 def __init__(self,
923 baseurl=None,
924 username=None,
925 password=None,
926 session_id_file=None,
927 session_timeout=None,
928 session_validation_ticket_id=1,
929 webservice_config_ticket=None,
930 webservice_config_faq=None,
931 webservice_config_link=None,
932 proxies=None,
933 https_verify=True,
934 ca_cert_bundle=None,
935 auth=None,
936 client_auth_cert=None,
937 customer_user=False,
938 user_agent=None,
939 webservice_path="/otrs/nph-genericinterface.pl/Webservice/",
940 use_legacy_sessions=False,
941 request_timeout=None
942 ):
944 if not baseurl:
945 raise ArgumentMissingError("baseurl")
946 self.baseurl = baseurl.rstrip("/")
947 self.webservice_path = webservice_path
949 if not session_timeout:
950 self.session_timeout = 28800 # 8 hours is OTRS default
951 else:
952 self.session_timeout = session_timeout
954 if not session_id_file:
955 if OS_TYPE_WINDOWS:
956 # No '/tmp/...' available on Windows OS, so using in Windows Temp folder instead
957 p = os.path.expanduser('~\\AppData\\Local\\Temp\\.pyotrs_session_id')
958 session_id_path = p
959 else:
960 session_id_path = "/tmp/.pyotrs_session_id" # Default for Linux
962 self.session_id_store = SessionStore(file_path=session_id_path,
963 session_timeout=self.session_timeout,
964 is_legacy=use_legacy_sessions)
965 else:
966 self.session_id_store = SessionStore(file_path=session_id_file,
967 session_timeout=self.session_timeout,
968 is_legacy=use_legacy_sessions)
970 self.session_validation_ticket_id = session_validation_ticket_id
972 # A dictionary for mapping OTRS WebService operations to HTTP Method, Route and
973 # Result string.
974 if not webservice_config_ticket:
975 webservice_config_ticket = TICKET_CONNECTOR_CONFIG_DEFAULT
977 if not webservice_config_link:
978 webservice_config_link = LINK_CONNECTOR_CONFIG_DEFAULT
980 self.ws_ticket = webservice_config_ticket['Name']
981 self.ws_link = webservice_config_link['Name']
983 self.routes_ticket = [x[1]["Route"] for x in webservice_config_ticket['Config'].items()]
984 self.routes_link = [x[1]["Route"] for x in webservice_config_link['Config'].items()]
986 webservice_config = {}
987 webservice_config.update(webservice_config_ticket['Config'])
988 webservice_config.update(webservice_config_link['Config'])
989 self.ws_config = webservice_config
991 if not proxies:
992 self.proxies = {"http": "", "https": "", "no": ""}
993 else:
994 if not isinstance(proxies, dict):
995 raise ValueError("Proxy settings need to be provided as dict!")
996 self.proxies = proxies
998 if https_verify:
999 if not ca_cert_bundle:
1000 self.https_verify = https_verify
1001 else:
1002 ca_certs = os.path.abspath(ca_cert_bundle)
1003 if not os.path.isfile(ca_certs):
1004 raise ValueError(f"Certificate file does not exist: {ca_certs}")
1005 self.https_verify = ca_certs
1006 else:
1007 self.https_verify = False
1009 self.auth = auth
1010 self.client_auth_cert = client_auth_cert
1011 self.request_timeout = request_timeout
1013 self.customer_user = customer_user
1015 self.user_agent = user_agent
1017 self.use_legacy_sessions = use_legacy_sessions
1019 # credentials
1020 self.username = username
1021 self.password = password
1023 # dummy initialization
1024 self.operation = None
1025 self.result_json = None
1026 self.result = []
1028 """
1029 Returns the correct session key to look for/send based
1030 on the current session compatibility level
1031 for legacy sessions (< OTRS V8) this returns 'SessionID'
1032 from V8 onwards this returns 'AccessToken'
1033 """
1035 @property
1036 def _session_key(self):
1037 if self.use_legacy_sessions:
1038 return 'SessionID'
1039 else:
1040 return 'AccessToken'
1042 """
1043 GenericInterface::Operation::Session::SessionCreate
1044 * session_check_is_valid
1045 * session_create
1046 * session_get
1047 * session_restore_or_set_up_new # try to get session_id from a (json) file on disc
1048 """
1050 @deprecation.deprecated(deprecated_in="0.10.0", removed_in="2.0", current_version=__version__,
1051 details="This method was implemented with a \"dirty\" workaround and "
1052 "should not be called anymore. Use Client.session_get() "
1053 "instead. ATTENTION: Using session_get() requires the OTRS "
1054 "route for HTTP GET /Session to be configured (which is/was "
1055 "not the default).")
1056 def session_check_is_valid(self, session_id=None):
1057 """check whether session_id is currently valid
1059 Args:
1060 session_id (str): optional if set overrides the self.session_id
1062 Raises:
1063 ArgumentMissingError: if session_id is not set
1065 Returns:
1066 **bool**: **True** if valid, otherwise **False**.
1068 .. note::
1069 Uses HTTP Method: GET
1070 """
1071 self.operation = "TicketGet"
1073 if not session_id:
1074 raise ArgumentMissingError("session_id")
1076 # TODO 2016-04-13 (RH): Is there a nicer way to check whether session is valid?!
1077 payload = {self._session_key: session_id}
1079 response = self._send_request(payload, data_id=self.session_validation_ticket_id)
1080 return self._parse_and_validate_response(response)
1082 def session_create(self):
1083 """create new (temporary) session (and Session ID)
1085 Returns:
1086 **bool**: **True** if successful, otherwise **False**.
1088 .. note::
1089 Session ID is recorded in self.session_id_store.value (**non persistently**)
1091 .. note::
1092 Uses HTTP Method: POST
1094 """
1095 if self.use_legacy_sessions:
1096 self.operation = "SessionCreate"
1097 else:
1098 self.operation = "AccessTokenCreate"
1100 if self.customer_user:
1101 payload = {
1102 "CustomerUserLogin": self.username,
1103 "Password": self.password
1104 }
1105 else:
1106 payload = {
1107 "UserLogin": self.username,
1108 "Password": self.password
1109 }
1111 response = self._send_request(payload)
1112 try:
1113 if not self._parse_and_validate_response(response):
1114 return False
1115 except ResponseParseError:
1116 if not self.use_legacy_sessions:
1117 log.warning("AccessTokenCreate failed, retrying using legacy session")
1118 self.use_legacy_sessions = True
1119 self.session_id_store.is_legacy = True
1120 return self.session_create()
1121 return False
1123 self.session_id_store.value = self.result_json[self._session_key]
1124 return True
1126 def session_get(self, session_id=None):
1127 """get/check/validate a Session ID
1129 Returns:
1130 **bool**: **True** if successful, otherwise **False**.
1132 .. note::
1133 Uses HTTP Method: GET
1135 """
1136 self.operation = "SessionGet"
1138 if not session_id:
1139 raise ArgumentMissingError("session_id")
1141 payload = {self._session_key: session_id}
1143 response = self._send_request(payload, data_id=session_id)
1144 return self._parse_and_validate_response(response)
1146 def session_restore_or_create(self):
1147 """Try to restore Session ID from file otherwise create new one and save to file
1149 Raises:
1150 SessionCreateError
1151 SessionIDFileError
1153 .. note::
1154 Session ID is recorded in self.session_id_store.value (**non persistently**)
1156 .. note::
1157 Session ID is **saved persistently** to file: *self.session_id_store.file_path*
1159 Returns:
1160 **bool**: **True** if successful, otherwise **False**.
1161 """
1162 # try to read session_id from file
1163 self.session_id_store.value = self.session_id_store.read()
1165 if self.session_id_store.value:
1166 # got one.. use stored is_legacy status info
1167 self.use_legacy_sessions = self.session_id_store.is_legacy
1168 # and the check whether it's still valid
1169 if self.session_get(self.session_id_store.value):
1170 log.info("Using valid Session ID "
1171 f"from ({self.session_id_store.file_path})")
1172 return True
1174 # got no (valid) session_id; clean store
1175 self.session_id_store.write("")
1177 # and try to create new one
1178 if not self.session_create():
1179 raise SessionCreateError("Failed to create a Session ID!")
1181 # save new created session_id to file
1182 if not self.session_id_store.write(self.result_json[self._session_key]):
1183 raise OSError("Failed to save Session ID to file!")
1184 else:
1185 log.info("Saved new Session ID to file: "
1186 f"{self.session_id_store.file_path}")
1187 return True
1189 @deprecation.deprecated(deprecated_in="0.10.0", removed_in="2.0", current_version=__version__,
1190 details="This method uses session_check_is_valid which was "
1191 "implemented with a \"dirty\" workaround and should not be "
1192 "called anymore. Use Client.session_restore_or_create() "
1193 "instead. ATTENTION: Using that also switches to "
1194 "session_get() which requires the OTRS route for HTTP GET "
1195 "/Session to be configured (which is/was not the default).")
1196 def session_restore_or_set_up_new(self):
1197 """Try to restore Session ID from file otherwise create new one and save to file
1199 Raises:
1200 SessionCreateError
1201 SessionIDFileError
1203 .. note::
1204 Session ID is recorded in self.session_id_store.value (**non persistently**)
1206 .. note::
1207 Session ID is **saved persistently** to file: *self.session_id_store.file_path*
1209 Returns:
1210 **bool**: **True** if successful, otherwise **False**.
1211 """
1212 # try to read session_id from file
1213 self.session_id_store.value = self.session_id_store.read()
1215 if self.session_id_store.value:
1216 # got one.. check whether it's still valid
1217 try:
1218 if self.session_check_is_valid(self.session_id_store.value):
1219 log.info("Using valid Session ID "
1220 f"from ({self.session_id_store.file_path})")
1221 return True
1222 except APIError:
1223 """Most likely invalid session_id. Remove it from session_id_store."""
1224 pass
1226 # got no (valid) session_id; clean store
1227 self.session_id_store.write("")
1229 # and try to create new one
1230 if not self.session_create():
1231 raise SessionCreateError("Failed to create a Session ID!")
1233 # save new created session_id to file
1234 if not self.session_id_store.write(self.result_json[self._session_key]):
1235 raise OSError("Failed to save Session ID to file!")
1236 else:
1237 log.info("Saved new Session ID to file: "
1238 f"{self.session_id_store.file_path}")
1239 return True
1241 """
1242 GenericInterface::Operation::Ticket::TicketCreate
1243 * ticket_create
1244 """
1246 def ticket_create(self,
1247 ticket=None,
1248 article=None,
1249 attachments=None,
1250 dynamic_fields=None,
1251 **kwargs):
1252 """Create a Ticket
1254 Args:
1255 ticket (Ticket): a ticket object
1256 article (Article): optional article
1257 attachments (list): *Attachment* objects
1258 dynamic_fields (list): *DynamicField* object
1259 **kwargs: any regular OTRS Fields (not for Dynamic Fields!)
1261 Returns:
1262 **dict** or **False**: dict if successful, otherwise **False**.
1263 """
1264 if not self.session_id_store.value:
1265 raise SessionNotCreated("Call session_create() or "
1266 "session_restore_or_create() first")
1267 self.operation = "TicketCreate"
1269 payload = {self._session_key: self.session_id_store.value}
1271 if not ticket:
1272 raise ArgumentMissingError("Ticket")
1274 if not article:
1275 raise ArgumentMissingError("Article")
1277 if isinstance(kwargs, dict) and len(kwargs) > 0:
1278 ticket.field_add(kwargs)
1280 payload.update(ticket.to_dct())
1282 if article:
1283 article.validate()
1284 payload.update({"Article": article.to_dct()})
1286 if attachments:
1287 # noinspection PyTypeChecker
1288 payload.update({"Attachment": [att.to_dct() for att in attachments]})
1290 if dynamic_fields:
1291 # noinspection PyTypeChecker
1292 payload.update({"DynamicField": [df.to_dct() for df in dynamic_fields]})
1294 if not self._parse_and_validate_response(self._send_request(payload)):
1295 return False
1296 else:
1297 return self.result_json
1299 """
1300 GenericInterface::Operation::Ticket::TicketGet
1302 * ticket_get_by_id
1303 * ticket_get_by_list
1304 * ticket_get_by_number
1305 """
1307 def ticket_get_by_id(self,
1308 ticket_id,
1309 articles=False,
1310 attachments=False,
1311 dynamic_fields=True,
1312 html_body_as_attachment=False):
1313 """ticket_get_by_id
1315 Args:
1316 ticket_id (int): Integer value of a Ticket ID
1317 attachments (bool): will request OTRS to include attachments (*default: False*)
1318 articles (bool): will request OTRS to include all
1319 Articles (*default: False*)
1320 dynamic_fields (bool): will request OTRS to include all
1321 Dynamic Fields (*default: True*)
1322 html_body_as_attachment (bool): Optional, If enabled the HTML body version of
1323 each article is added to the attachments list
1325 Returns:
1326 **Ticket** or **False**: Ticket object if successful, otherwise **False**.
1328 """
1329 if not self.session_id_store.value:
1330 raise SessionNotCreated("Call session_create() or "
1331 "session_restore_or_create() first")
1332 self.operation = "TicketGet"
1334 payload = {
1335 self._session_key: self.session_id_store.value,
1336 "TicketID": f"{ticket_id}",
1337 "AllArticles": int(articles),
1338 "Attachments": int(attachments),
1339 "DynamicFields": int(dynamic_fields),
1340 "HTMLBodyAsAttachment": int(html_body_as_attachment),
1341 }
1343 response = self._send_request(payload, ticket_id)
1344 if not self._parse_and_validate_response(response):
1345 return False
1346 else:
1347 return self.result[0]
1349 def ticket_get_by_list(self,
1350 ticket_id_list,
1351 articles=False,
1352 attachments=False,
1353 dynamic_fields=True,
1354 html_body_as_attachment=False):
1355 """ticket_get_by_list
1357 Args:
1358 ticket_id_list (list): List of either String or Integer values
1359 attachments (bool): will request OTRS to include attachments (*default: False*)
1360 articles (bool): will request OTRS to include all
1361 Articles (*default: False*)
1362 dynamic_fields (bool): will request OTRS to include all
1363 Dynamic Fields (*default: True*)
1364 html_body_as_attachment (bool): Optional, If enabled the HTML body version of
1365 each article is added to the attachments list
1367 Returns:
1368 **list**: Ticket objects (as list) if successful, otherwise **False**.
1370 """
1371 if not self.session_id_store.value:
1372 raise SessionNotCreated("Call session_create() or "
1373 "session_restore_or_create() first")
1374 self.operation = "TicketGetList"
1376 if not isinstance(ticket_id_list, list):
1377 raise ArgumentInvalidError("Please provide list of IDs!")
1379 # When you ask with an empty ticket_id_list, you get an empty response
1380 if not ticket_id_list:
1381 return []
1383 payload = {
1384 self._session_key: self.session_id_store.value,
1385 "TicketID": ','.join([str(item) for item in ticket_id_list]),
1386 "AllArticles": int(articles),
1387 "Attachments": int(attachments),
1388 "DynamicFields": int(dynamic_fields),
1389 "HTMLBodyAsAttachment": int(html_body_as_attachment),
1390 }
1392 if not self._parse_and_validate_response(self._send_request(payload)):
1393 return False
1394 else:
1395 return self.result
1397 def ticket_get_by_number(self,
1398 ticket_number,
1399 articles=False,
1400 attachments=False,
1401 dynamic_fields=True,
1402 html_body_as_attachment=False):
1403 """ticket_get_by_number
1405 Args:
1406 ticket_number (str): Ticket Number as str
1407 attachments (bool): will request OTRS to include attachments (*default: False*)
1408 articles (bool): will request OTRS to include all
1409 Articles (*default: False*)
1410 dynamic_fields (bool): will request OTRS to include all
1411 Dynamic Fields (*default: True*)
1412 html_body_as_attachment (bool): Optional, If enabled the HTML body version of
1413 each article is added to the attachments list
1415 Raises:
1416 ValueError
1418 Returns:
1419 **Ticket** or **False**: Ticket object if successful, otherwise **False**.
1421 """
1422 if isinstance(ticket_number, int):
1423 raise ArgumentInvalidError("Provide ticket_number as str/unicode. "
1424 "Got ticket_number as int.")
1425 result_list = self.ticket_search(TicketNumber=ticket_number)
1427 if not result_list:
1428 return False
1430 if len(result_list) == 1:
1431 result = self.ticket_get_by_id(result_list[0],
1432 articles=articles,
1433 attachments=attachments,
1434 dynamic_fields=dynamic_fields,
1435 html_body_as_attachment=html_body_as_attachment)
1436 if not result:
1437 return False
1438 else:
1439 return result
1440 else:
1441 # TODO 2016-11-12 (RH): more than one ticket found for a specific ticket number
1442 raise ValueError("Found more than one result for "
1443 f"Ticket Number: {ticket_number}")
1445 """
1446 GenericInterface::Operation::Ticket::TicketHistoryGet
1448 * ticket_history_get_by_id
1449 """
1451 def ticket_history_get_by_id(self, ticket_id):
1452 """ticket_history_get_by_id
1454 Args:
1455 ticket_id (int): Integer value of a Ticket ID
1457 Returns:
1458 **dict** or **False**: A dict("History") containing a list of dicts, otherwise **False**.
1460 """
1461 if not self.session_id_store.value:
1462 raise SessionNotCreated("Call session_create() or "
1463 "session_restore_or_create() first")
1464 self.operation = "TicketHistoryGet"
1466 payload = {
1467 self._session_key: self.session_id_store.value,
1468 "TicketID": f"{ticket_id}"
1469 }
1471 response = self._send_request(payload, ticket_id)
1472 if not self._parse_and_validate_response(response):
1473 return False
1474 else:
1475 return self.result[0]
1477 """
1478 GenericInterface::Operation::Ticket::TicketSearch
1479 * ticket_search
1480 * ticket_search_full_text
1481 """
1483 def ticket_search(self, dynamic_fields=None, **kwargs):
1484 """Search for ticket
1486 Args:
1487 dynamic_fields (list): List of DynamicField objects for which the search
1488 should be performed
1489 **kwargs: Arbitrary keyword arguments (not for DynamicField objects).
1491 Returns:
1492 **list** or **False**: The search result (as list) if successful (can be an
1493 empty list: []), otherwise **False**.
1495 .. note::
1496 If value of kwargs is a datetime object then this object will be
1497 converted to the appropriate string format for OTRS API.
1499 """
1500 if not self.session_id_store.value:
1501 raise SessionNotCreated("Call session_create() or "
1502 "session_restore_or_create() first")
1503 self.operation = "TicketSearch"
1504 payload = {
1505 self._session_key: self.session_id_store.value,
1506 }
1508 if dynamic_fields:
1509 if isinstance(dynamic_fields, DynamicField):
1510 payload.update(dynamic_fields.to_dct_search())
1511 else:
1512 for df in dynamic_fields:
1513 payload.update(df.to_dct_search())
1515 if kwargs is not None:
1516 for key, value in kwargs.items():
1517 if isinstance(value, datetime.datetime):
1518 value = value.strftime("%Y-%m-%d %H:%M:%S")
1519 payload.update({key: value})
1521 if not self._parse_and_validate_response(self._send_request(payload)):
1522 return False
1523 else:
1524 return self.result
1526 def ticket_search_full_text(self, pattern):
1527 """Wrapper for search ticket for full text search
1529 Args:
1530 pattern (str): Search pattern (a '%' will be added to front and end automatically)
1532 Returns:
1533 **list** or **False**: The search result (as list) if successful,
1534 otherwise **False**.
1536 """
1537 self.operation = "TicketSearch"
1538 pattern_wildcard = f"%{pattern}%"
1540 if self.use_legacy_sessions:
1541 return self.ticket_search(FullTextIndex="1",
1542 ContentSearch="OR",
1543 Subject=pattern_wildcard,
1544 Body=pattern_wildcard)
1545 else:
1546 return self.ticket_search(FullTextIndex="1",
1547 ContentSearch="OR",
1548 MIMEBase_Subject=pattern_wildcard,
1549 MIMEBase_Body=pattern_wildcard)
1551 """
1552 GenericInterface::Operation::Ticket::TicketUpdate
1553 * ticket_update
1554 * ticket_update_set_pending
1555 """
1557 def ticket_update(self,
1558 ticket_id,
1559 article=None,
1560 attachments=None,
1561 dynamic_fields=None,
1562 **kwargs):
1563 """Update a Ticket
1565 Args:
1567 ticket_id (int): Ticket ID as integer value
1568 article (Article): **optional** one *Article* that will be add to the ticket
1569 attachments (list): list of one or more *Attachment* objects that will
1570 be added to ticket. Also requires an *Article*!
1571 dynamic_fields (list): *DynamicField* objects
1572 **kwargs: any regular Ticket Fields (not for Dynamic Fields!)
1574 Returns:
1575 **dict** or **False**: A dict if successful, otherwise **False**.
1576 """
1577 if not self.session_id_store.value:
1578 raise SessionNotCreated("Call session_create() or "
1579 "session_restore_or_create() first")
1580 self.operation = "TicketUpdate"
1582 payload = {self._session_key: self.session_id_store.value, "TicketID": ticket_id}
1584 if article:
1585 article.validate()
1586 payload.update({"Article": article.to_dct()})
1588 if attachments:
1589 if not article:
1590 raise ArgumentMissingError("To create an attachment an article is needed!")
1591 # noinspection PyTypeChecker
1592 payload.update({"Attachment": [att.to_dct() for att in attachments]})
1594 if dynamic_fields:
1595 # noinspection PyTypeChecker
1596 payload.update({"DynamicField": [df.to_dct() for df in dynamic_fields]})
1598 if kwargs is not None and not kwargs == {}:
1599 ticket_dct = {}
1600 for key, value in kwargs.items():
1601 ticket_dct.update({key: value})
1602 payload.update({"Ticket": ticket_dct})
1604 if not self._parse_and_validate_response(self._send_request(payload, ticket_id)):
1605 return False
1607 return self.result_json
1609 def ticket_update_set_pending(self,
1610 ticket_id,
1611 new_state="pending reminder",
1612 pending_days=1,
1613 pending_hours=0):
1614 """ticket_update_set_state_pending
1616 Args:
1617 ticket_id (int): Ticket ID as integer value
1618 new_state (str): defaults to "pending reminder"
1619 pending_days (int): defaults to 1
1620 pending_hours (int): defaults to 0
1622 Returns:
1623 **dict** or **False**: A dict if successful, otherwise **False**.
1625 .. note::
1626 Operates in UTC
1627 """
1628 datetime_now = datetime.datetime.utcnow()
1629 pending_till = datetime_now + datetime.timedelta(days=pending_days, hours=pending_hours)
1631 pt = Ticket.datetime_to_pending_time_text(datetime_object=pending_till)
1633 return self.ticket_update(ticket_id, State=new_state, PendingTime=pt)
1635 """
1636 GenericInterface::Operation::Link::LinkAdd
1637 * link_add
1638 """
1640 def link_add(self,
1641 src_object_id,
1642 dst_object_id,
1643 src_object_type="Ticket",
1644 dst_object_type="Ticket",
1645 link_type="Normal",
1646 state="Valid"):
1647 """link_add
1649 Args:
1650 src_object_id (int): Integer value of source object ID
1651 dst_object_id (int): Integer value of destination object ID
1652 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"...
1653 (*default: Ticket*)
1654 dst_object_type (str): Object type of destination; e.g. "Ticket", "FAQ"...
1655 (*default: Ticket*)
1656 link_type (str): Type of the link: "Normal" or "ParentChild" (*default: Normal*)
1657 state (str): State of the link (*default: Normal*)
1659 Returns:
1660 **True** or **False**: True if successful, otherwise **False**.
1662 """
1663 if not self.session_id_store.value:
1664 raise SessionNotCreated("Call session_create() or "
1665 "session_restore_or_create() first")
1666 self.operation = "LinkAdd"
1668 payload = {
1669 self._session_key: self.session_id_store.value,
1670 "SourceObject": src_object_type,
1671 "SourceKey": int(src_object_id),
1672 "TargetObject": dst_object_type,
1673 "TargetKey": int(dst_object_id),
1674 "Type": link_type,
1675 "State": state
1676 }
1678 return self._parse_and_validate_response(self._send_request(payload))
1680 """
1681 GenericInterface::Operation::Link::LinkDelete
1682 * link_delete
1683 """
1685 def link_delete(self,
1686 src_object_id,
1687 dst_object_id,
1688 src_object_type="Ticket",
1689 dst_object_type="Ticket",
1690 link_type="Normal"):
1691 """link_delete
1693 Args:
1694 src_object_id (int): Integer value of source object ID
1695 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"...
1696 (*default: Ticket*)
1697 dst_object_id (int): Integer value of source object ID
1698 dst_object_type (str): Object type of source; e.g. "Ticket", "FAQ"...
1699 (*default: Ticket*)
1700 link_type (str): Type of the link: "Normal" or "ParentChild" (*default: Normal*)
1702 Returns:
1703 **True** or **False**: True if successful, otherwise **False**.
1705 """
1706 if not self.session_id_store.value:
1707 raise SessionNotCreated("Call session_create() or "
1708 "session_restore_or_create() first")
1709 self.operation = "LinkDelete"
1711 payload = {
1712 self._session_key: self.session_id_store.value,
1713 "Object1": src_object_type,
1714 "Key1": int(src_object_id),
1715 "Object2": dst_object_type,
1716 "Key2": int(dst_object_id),
1717 "Type": link_type
1718 }
1720 return self._parse_and_validate_response(self._send_request(payload))
1722 """
1723 GenericInterface::Operation::Link::LinkDeleteAll
1724 * link_delete_all
1725 """
1727 def link_delete_all(self,
1728 object_id,
1729 object_type="Ticket"):
1730 """link_delete_all
1732 Args:
1733 object_id (int): Integer value of source object ID
1734 object_type (str): Object type of source; e.g. "Ticket", "FAQ"...
1735 (*default: Ticket*)
1737 Returns:
1738 **True** or **False**: True if successful, otherwise **False**.
1740 """
1741 if not self.session_id_store.value:
1742 raise SessionNotCreated("Call session_create() or "
1743 "session_restore_or_create() first")
1744 self.operation = "LinkDeleteAll"
1746 payload = {
1747 self._session_key: self.session_id_store.value,
1748 "Object": object_type,
1749 "Key": int(object_id)
1750 }
1752 return self._parse_and_validate_response(self._send_request(payload))
1754 """
1755 GenericInterface::Operation::Link::LinkList
1756 * link_list
1757 """
1759 def link_list(self,
1760 src_object_id,
1761 src_object_type="Ticket",
1762 dst_object_type=None,
1763 state="Valid",
1764 link_type=None,
1765 direction=None):
1766 """link_list
1768 Args:
1769 src_object_id (int): Integer value of source object ID
1770 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"...
1771 (*default: Ticket*)
1772 dst_object_type (str): Object type of destination; e.g. "Ticket", "FAQ"...
1773 Optional restriction of the object where the links point to. (*default: Ticket*)
1774 state (str): State of the link (*default: Valid*)
1775 link_type (str): Type of the link: "Normal" or "ParentChild" (*default: Normal*)
1776 direction (str): Optional restriction of the link direction ('Source' or 'Target').
1778 Returns:
1779 **list** or **None**: List of found dict links if successful, if empty **None**.
1781 """
1782 if not self.session_id_store.value:
1783 raise SessionNotCreated("Call session_create() or "
1784 "session_restore_or_create() first")
1785 self.operation = "LinkList"
1787 payload = {
1788 self._session_key: self.session_id_store.value,
1789 "Object": src_object_type,
1790 "Key": int(src_object_id),
1791 "State": state
1792 }
1794 if dst_object_type:
1795 payload.update({"Object2": dst_object_type})
1797 if link_type:
1798 payload.update({"Type": link_type})
1800 if direction:
1801 payload.update({"Direction": direction})
1803 result = None
1804 if self._parse_and_validate_response(self._send_request(payload)):
1805 result = self.result
1806 return result
1808 """
1809 GenericInterface::Operation::Link::PossibleLinkList
1810 * link_possible_link_list
1811 """
1813 def link_possible_link_list(self):
1814 """link_possible_link_list
1816 Returns:
1817 **List** or **False**: List if successful, otherwise **False**.
1819 """
1820 if not self.session_id_store.value:
1821 raise SessionNotCreated("Call session_create() or "
1822 "session_restore_or_create() first")
1823 self.operation = "PossibleLinkList"
1825 payload = {
1826 self._session_key: self.session_id_store.value,
1827 }
1829 if self._parse_and_validate_response(self._send_request(payload)):
1830 return self.result
1831 else:
1832 return False
1834 """
1835 GenericInterface::Operation::Link::PossibleObjectsList
1836 * link_possible_objects_list
1837 """
1839 def link_possible_objects_list(self,
1840 object_type="Ticket"):
1841 """link_possible_objects_list
1843 Args:
1844 object_type (str): Object type; e.g. "Ticket", "FAQ"...
1845 (*default: Ticket*)
1847 Returns:
1848 **List** or **False**: List if successful, otherwise **False**.
1850 """
1851 if not self.session_id_store.value:
1852 raise SessionNotCreated("Call session_create() or "
1853 "session_restore_or_create() first")
1854 self.operation = "PossibleObjectsList"
1856 payload = {
1857 self._session_key: self.session_id_store.value,
1858 "Object": object_type,
1859 }
1861 if self._parse_and_validate_response(self._send_request(payload)):
1862 return self.result
1863 else:
1864 return False
1866 """
1867 GenericInterface::Operation::Link::PossibleTypesList
1868 * link_possible_types_list
1869 """
1871 def link_possible_types_list(self,
1872 src_object_type="Ticket",
1873 dst_object_type="Ticket"):
1874 """link_possible_types_list
1876 Args:
1877 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"...
1878 (*default: Ticket*)
1879 dst_object_type (str): Object type of destination; e.g. "Ticket", "FAQ"...
1880 (*default: Ticket*)
1882 Returns:
1883 **List** or **False**: List if successful, otherwise **False**.
1885 """
1886 if not self.session_id_store.value:
1887 raise SessionNotCreated("Call session_create() or "
1888 "session_restore_or_create() first")
1889 self.operation = "PossibleTypesList"
1891 payload = {
1892 self._session_key: self.session_id_store.value,
1893 "Object1": src_object_type,
1894 "Object2": dst_object_type,
1895 }
1897 if self._parse_and_validate_response(self._send_request(payload)):
1898 return self.result
1899 else:
1900 return False
1902 def _build_url(self, data_id=None):
1903 """build url for request
1905 Args:
1906 data_id (optional[int])
1908 Returns:
1909 **str**: The complete URL where the request will be send to.
1911 """
1912 route = self.ws_config[self.operation]["Route"]
1914 if ":" in route:
1915 route_split = route.split(":")
1916 route = route_split[0]
1917 route_arg = route_split[1]
1919 if route_arg == "TicketID":
1920 if not data_id:
1921 raise ValueError("TicketID is None but Route requires "
1922 f"TicketID: {route}")
1923 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_ticket}{route}{data_id}"
1924 elif route_arg == "SessionID":
1925 if not data_id:
1926 raise ValueError("SessionID is None but Route requires "
1927 f"SessionID: {route}")
1928 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_ticket}{route}{data_id}"
1929 else:
1930 if route in self.routes_ticket:
1931 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_ticket}{route}"
1932 elif route in self.routes_link:
1933 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_link}{route}"
1935 return self._url
1937 def _send_request(self, payload=None, data_id=None):
1938 """send the API request using the *requests.request* method
1940 Args:
1941 payload (dict)
1942 data_id (optional[dict])
1944 Raises:
1945 OTRSHTTPError:
1947 Returns:
1948 **requests.Response**: Response received after sending the request.
1950 .. note::
1951 Supported HTTP Methods: DELETE, GET, HEAD, PATCH, POST, PUT
1952 """
1953 if not payload:
1954 raise ArgumentMissingError("payload")
1956 self._result_type = self.ws_config[self.operation]["Result"]
1958 url = self._build_url(data_id)
1960 http_method = self.ws_config[self.operation]["RequestMethod"]
1962 if http_method not in ["DELETE", "GET", "HEAD", "PATCH", "POST", "PUT"]:
1963 raise ValueError("invalid http_method")
1965 headers = {}
1967 if self.user_agent:
1968 headers.update({"User-Agent": self.user_agent})
1970 if http_method == "GET":
1972 # print("sending {0} to {1} as {2}".format(payload, url, http_method.upper()))
1973 try:
1974 response = requests.request("GET",
1975 url,
1976 headers=headers,
1977 params=payload,
1978 proxies=self.proxies,
1979 verify=self.https_verify,
1980 cert=self.client_auth_cert,
1981 auth=self.auth,
1982 timeout=self.request_timeout)
1984 # store a copy of the request
1985 self._request = response.request
1987 # critical error: HTTP request resulted in an error!
1988 except Exception as err:
1989 # raise OTRSHTTPError("get http")
1990 raise HTTPError("Failed to access OTRS. Check Hostname, Proxy, SSL Certificate!\n"
1991 f"Error with http communication: {err}")
1993 else:
1995 headers.update({"Content-Type": "application/json"})
1997 json_payload = json.dumps(payload)
1999 # print("sending {0} to {1} as {2}".format(payload, url, http_method.upper()))
2000 try:
2001 response = requests.request(http_method.upper(),
2002 url,
2003 headers=headers,
2004 data=json_payload,
2005 proxies=self.proxies,
2006 verify=self.https_verify,
2007 cert=self.client_auth_cert,
2008 auth=self.auth,
2009 timeout=self.request_timeout)
2011 # store a copy of the request
2012 self._request = response.request
2014 # critical error: HTTP request resulted in an error!
2015 except Exception as err:
2016 # raise OTRSHTTPError("get http")
2017 raise HTTPError("Failed to access OTRS. Check Hostname, Proxy, SSL Certificate!\n"
2018 f"Error with http communication: {err}")
2020 if not response.status_code == 200:
2021 raise HTTPError("Received HTTP Error. Check Hostname and WebServiceName.\n"
2022 f"HTTP Status Code: {response.status_code}\n"
2023 f"HTTP Message: {response.content}")
2024 return response
2026 def _parse_and_validate_response(self, response):
2027 """_parse_and_validate_response
2029 Args:
2030 response (requests.Response): result of _send_request
2032 Raises:
2033 OTRSAPIError
2034 NotImplementedError
2035 ResponseParseError
2037 Returns:
2038 **bool**: **True** if successful
2040 """
2042 if not isinstance(response, requests.models.Response):
2043 raise ValueError("requests.Response object expected!")
2045 if self.operation not in self.ws_config.keys():
2046 raise ValueError("invalid operation")
2048 # clear data from Client
2049 self.result = None
2050 self._result_error = False
2052 # get and set new data
2053 self.result_json = response.json()
2054 self._result_status_code = response.status_code
2055 self._result_content = response.content
2057 # handle TicketSearch operation first. special: empty search result has no "TicketID"
2058 if self.operation == "TicketSearch":
2059 if not self.result_json:
2060 self.result = []
2061 return True
2062 if self.result_json.get(self._result_type, None):
2063 self.result = self.result_json['TicketID']
2064 return True
2066 # now handle SessionGet operation
2067 if self.operation in ["SessionGet"]:
2068 # For SessionGet the "Result" that is returned is different when legacy session
2069 # are used.
2070 # SessionData was default in OTRS <= 7 / PyOTRS used it as default from v0.1.
2071 # AccessTokenData was introduced in OTRS 8 - for CustomerUser already in 7 (?!).
2072 # Unless the dict was modified - use the hardcoded defaults accordingly.
2073 if self._result_type not in ["AccessTokenData", "SessionData"]:
2074 session_result_type = self._result_type
2075 else:
2076 if self.use_legacy_sessions:
2077 session_result_type = "SessionData"
2078 else:
2079 session_result_type = "AccessTokenData"
2081 _session_data = self.result_json.get(session_result_type, None)
2082 if _session_data: # received SessionData -> Session ID is valid
2083 self._result_error = False
2084 self.result = self.result_json[session_result_type]
2085 return True
2086 elif self.result_json["Error"]["ErrorCode"] == "SessionGet.SessionInvalid":
2087 return False
2088 else:
2089 raise APIError("Failed to access OTRS API.\n"
2090 "OTRS Error Code: {}\nOTRS Error Message: {}"
2091 "".format(self.result_json["Error"]["ErrorCode"],
2092 self.result_json["Error"]["ErrorMessage"]))
2094 # handle Link operations; Add, Delete, DeleteAll return: {"Success":1}
2095 if self.operation in ["LinkAdd", "LinkDelete", "LinkDeleteAll"]:
2096 if self.result_json.get("Success", None) == 1:
2097 return True
2099 # LinkList result can be empty
2100 if self.operation in "LinkList":
2101 _link_list = self.result_json.get("LinkList", None)
2102 if not _link_list:
2103 self.result = None
2104 return True
2105 else:
2106 self.result = _link_list
2107 return True
2109 # now handle other operations
2110 if self.result_json.get(self._result_type, None):
2111 self._result_error = False
2112 self.result = self.result_json[self._result_type]
2113 elif self.result_json.get("Error", None):
2114 self._result_error = True
2115 else:
2116 self._result_error = True
2117 # critical error: Unknown response from OTRS API - FAIL NOW!
2118 raise ResponseParseError("Unknown key in response JSON DICT!")
2120 # report error
2121 if self._result_error:
2122 raise APIError("Failed to access OTRS API. Check Username and Password! "
2123 "Session ID expired?! Does Ticket exist?\n"
2124 "OTRS Error Code: {}\nOTRS Error Message: {}"
2125 "".format(self.result_json["Error"]["ErrorCode"],
2126 self.result_json["Error"]["ErrorMessage"]))
2128 # for operation TicketGet: parse result list into Ticket object list
2129 if self.operation == "TicketGet" or self.operation == "TicketGetList":
2130 self.result = [Ticket(item) for item in self.result_json['Ticket']]
2132 return True
2134# EOF