Coverage for pyotrs/lib.py: 93%

717 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-06 19:25 +0000

1""" lib.py 

2 

3PyOTRS lib 

4 

5This code implements the PyOTRS library to provide access to the OTRS API (REST) 

6""" 

7 

8import base64 

9import datetime 

10import json 

11import logging 

12import mimetypes 

13import os 

14import time 

15 

16import deprecation 

17import requests 

18 

19from .version import __version__ 

20 

21log = logging.getLogger(__name__) 

22 

23TICKET_CONNECTOR_CONFIG_DEFAULT = { 

24 'Name': 'GenericTicketConnectorREST', 

25 'Config': { 

26 'SessionCreate': {'RequestMethod': 'POST', 

27 'Route': '/Session', 

28 'Result': 'SessionID'}, 

29 'AccessTokenCreate': {'RequestMethod': 'POST', 

30 'Route': '/Session', 

31 'Result': 'AccessToken'}, 

32 'SessionGet': {'RequestMethod': 'GET', 

33 'Route': '/Session/:SessionID', 

34 'Result': 'SessionData'}, 

35 'TicketCreate': {'RequestMethod': 'POST', 

36 'Route': '/Ticket', 

37 'Result': 'TicketID'}, 

38 'TicketGet': {'RequestMethod': 'GET', 

39 'Route': '/Ticket/:TicketID', 

40 'Result': 'Ticket'}, 

41 'TicketGetList': {'RequestMethod': 'GET', 

42 'Route': '/TicketList', 

43 'Result': 'Ticket'}, 

44 'TicketHistoryGet': {'RequestMethod': 'GET', 

45 'Route': '/TicketHistory/:TicketID', 

46 'Result': 'TicketHistory'}, 

47 'TicketSearch': {'RequestMethod': 'GET', 

48 'Route': '/Ticket', 

49 'Result': 'TicketID'}, 

50 'TicketUpdate': {'RequestMethod': 'PATCH', 

51 'Route': '/Ticket/:TicketID', 

52 'Result': 'TicketID'}, 

53 } 

54} 

55 

56LINK_CONNECTOR_CONFIG_DEFAULT = { 

57 'Name': 'GenericLinkConnectorREST', 

58 'Config': { 

59 'LinkAdd': {'RequestMethod': 'POST', 

60 'Route': '/LinkAdd', 

61 'Result': 'LinkAdd'}, 

62 'LinkDelete': {'RequestMethod': 'DELETE', 

63 'Route': '/LinkDelete', 

64 'Result': 'LinkDelete'}, 

65 'LinkDeleteAll': {'RequestMethod': 'DELETE', 

66 'Route': '/LinkDeleteAll', 

67 'Result': 'LinkDeleteAll'}, 

68 'LinkList': {'RequestMethod': 'GET', 

69 'Route': '/LinkList', 

70 'Result': 'LinkList'}, 

71 'PossibleLinkList': {'RequestMethod': 'GET', 

72 'Route': '/PossibleLinkList', 

73 'Result': 'PossibleLinkList'}, 

74 'PossibleObjectsList': {'RequestMethod': 'GET', 

75 'Route': '/PossibleObjectsList', 

76 'Result': 'PossibleObject'}, 

77 'PossibleTypesList': {'RequestMethod': 'GET', 

78 'Route': '/PossibleTypesList', 

79 'Result': 'PossibleType'}, 

80 } 

81} 

82 

83# Check if OS is Linux or Windows (#47): 

84OS_TYPE_WINDOWS = False 

85try: 

86 from platform import system 

87 

88 if 'Windows' in system(): 

89 OS_TYPE_WINDOWS = True 

90 log.debug("OS Type: Windows") 

91 else: 

92 log.debug("OS Type: Linux") 

93except: # noqa: E722 

94 log.warning("Could not determine OS-Type (Linux/Windows). Using default: 'Linux'.") 

95 

96 

97class PyOTRSError(Exception): 

98 def __init__(self, message): 

99 super().__init__(message) 

100 self.message = message 

101 

102 

103class ArgumentMissingError(PyOTRSError): 

104 pass 

105 

106 

107class ArgumentInvalidError(PyOTRSError): 

108 pass 

109 

110 

111class ResponseParseError(PyOTRSError): 

112 pass 

113 

114 

115class SessionCreateError(PyOTRSError): 

116 pass 

117 

118 

119class SessionNotCreated(PyOTRSError): # noqa: N818 

120 pass 

121 

122 

123class APIError(PyOTRSError): 

124 pass 

125 

126 

127class HTTPError(PyOTRSError): 

128 pass 

129 

130 

131class Article: 

132 """PyOTRS Article class """ 

133 

134 def __init__(self, dct): 

135 fields = {} 

136 for key, _value in dct.items(): 

137 fields.update({key: dct[key]}) 

138 

139 try: 

140 self.aid = int(fields.get("ArticleID")) 

141 except TypeError: 

142 self.aid = 0 

143 

144 self.fields = fields 

145 

146 self.attachments = self._parse_attachments() 

147 self.fields.pop("Attachment", None) 

148 

149 self.dynamic_fields = self._parse_dynamic_fields() 

150 self.fields.pop("DynamicField", None) 

151 

152 def __repr__(self): 

153 if self.aid != 0: 

154 _len = len(self.attachments) 

155 if _len == 0: 

156 return f"<ArticleID: {self.aid}>" 

157 elif _len == 1: 

158 return f"<ArticleID: {self.aid} (1 Attachment)>" 

159 else: 

160 return f"<ArticleID: {self.aid} ({_len} Attachments)>" 

161 else: 

162 return f"<{self.__class__.__name__}>" 

163 

164 def to_dct(self, attachments=True, attachment_cont=True, dynamic_fields=True): 

165 """represent as nested dict compatible for OTRS 

166 

167 Args: 

168 attachments (bool): if True will include, otherwise exclude: 

169 "Attachment" (default: True) 

170 attachment_cont (bool): if True will include, otherwise exclude: 

171 "Attachment" > "Content" (default: True) 

172 dynamic_fields (bool): if True will include, otherwise exclude: 

173 "DynamicField" (default: True) 

174 

175 Returns: 

176 **dict**: Article represented as dict for OTRS 

177 

178 """ 

179 dct = {} 

180 

181 if attachments: 

182 if self.attachments: 

183 dct.update({"Attachment": [x.to_dct(content=attachment_cont) for x in 

184 self.attachments]}) 

185 

186 if dynamic_fields: 

187 if self.dynamic_fields: 

188 dct.update({"DynamicField": [x.to_dct() for x in self.dynamic_fields]}) 

189 

190 if self.fields: 

191 dct.update(self.fields) 

192 

193 return dct 

194 

195 def _parse_attachments(self): 

196 """parse Attachment from Ticket and return as **list** of **Attachment** objects""" 

197 lst = self.fields.get("Attachment") 

198 if lst: 

199 return [Attachment(item) for item in lst] 

200 else: 

201 return [] 

202 

203 def _parse_dynamic_fields(self): 

204 """parse DynamicField from Ticket and return as **list** of **DynamicField** objects""" 

205 lst = self.fields.get("DynamicField") 

206 if lst: 

207 return [DynamicField.from_dct(item) for item in lst] 

208 else: 

209 return [] 

210 

211 def attachment_get(self, a_filename): 

212 """attachment_get 

213 

214 Args: 

215 a_filename (str): Filename of Attachment to retrieve 

216 

217 Returns: 

218 **Attachment** or **None** 

219 

220 """ 

221 result = [x for x in self.attachments if x.Filename == f"{a_filename}"] 

222 if result: 

223 return result[0] 

224 else: 

225 return None 

226 

227 def dynamic_field_get(self, df_name): 

228 """dynamic_field_get 

229 

230 Args: 

231 df_name (str): Name of DynamicField to retrieve 

232 

233 Returns: 

234 **DynamicField** or **None** 

235 

236 """ 

237 

238 result = [x for x in self.dynamic_fields if x.name == f"{df_name}"] 

239 if result: 

240 return result[0] 

241 else: 

242 return None 

243 

244 def field_get(self, f_name): 

245 return self.fields.get(f_name) 

246 

247 def validate(self, validation_map=None): 

248 """validate data against a mapping dict - if a key is not present 

249 then set it with a default value according to dict 

250 

251 Args: 

252 validation_map (dict): A mapping for all Article fields that have to be set. During 

253 validation every required field that is not set will be set to a default value 

254 specified in this dict. 

255 

256 .. note:: 

257 There is also a blacklist (fields to be removed) but this is currently 

258 hardcoded to *dynamic_fields* and *attachments*. 

259 

260 """ 

261 if not validation_map: 

262 validation_map = {"Body": "API created Article Body", 

263 "Charset": "UTF8", 

264 "MimeType": "text/plain", 

265 "Subject": "API created Article", 

266 "TimeUnit": 0} 

267 

268 for key, value in validation_map.items(): 

269 if not self.fields.get(key, None): 

270 self.fields.update({key: value}) 

271 

272 @classmethod 

273 def _dummy(cls): 

274 """dummy data (for testing) 

275 

276 Returns: 

277 **Article**: An Article object. 

278 

279 """ 

280 return Article({"Subject": "Dümmy Subject", 

281 "Body": "Hallo Bjørn,\n[kt]\n\n -- The End", 

282 "TimeUnit": 0, 

283 "MimeType": "text/plain", 

284 "Charset": "UTF8"}) 

285 

286 @classmethod 

287 def _dummy_force_notify(cls): 

288 """dummy data (for testing) 

289 

290 Returns: 

291 **Article**: An Article object. 

292 

293 """ 

294 return Article({"Subject": "Dümmy Subject", 

295 "Body": "Hallo Bjørn,\n[kt]\n\n -- The End", 

296 "TimeUnit": 0, 

297 "MimeType": "text/plain", 

298 "Charset": "UTF8", 

299 "ForceNotificationToUserID": [1, 2]}) 

300 

301 

302class Attachment: 

303 """PyOTRS Attachment class """ 

304 

305 def __init__(self, dct): 

306 self.__dict__ = dct 

307 

308 def __repr__(self): 

309 if hasattr(self, 'Filename'): 

310 return f"<{self.__class__.__name__}: {self.Filename}>" 

311 else: 

312 return f"<{self.__class__.__name__}>" 

313 

314 def to_dct(self, content=True): 

315 """represent Attachment object as dict 

316 Args: 

317 content (bool): if True will include, otherwise exclude: "Content" (default: True) 

318 

319 Returns: 

320 **dict**: Attachment represented as dict. 

321 

322 """ 

323 dct = self.__dict__ 

324 if content: 

325 return dct 

326 else: 

327 dct.pop("Content") 

328 return dct 

329 

330 @classmethod 

331 def create_basic(cls, Content=None, ContentType=None, Filename=None): # noqa: N803 

332 """create a basic Attachment object 

333 

334 Args: 

335 Content (str): base64 encoded content 

336 ContentType (str): MIME type of content (e.g. text/plain) 

337 Filename (str): file name (e.g. file.txt) 

338 

339 

340 Returns: 

341 **Attachment**: An Attachment object. 

342 

343 """ 

344 return Attachment({'Content': Content, 

345 'ContentType': ContentType, 

346 'Filename': Filename}) 

347 

348 @classmethod 

349 def create_from_file(cls, file_path): 

350 """save Attachment to a folder on disc 

351 

352 Args: 

353 file_path (str): The full path to the file from which an Attachment should be created. 

354 

355 Returns: 

356 **Attachment**: An Attachment object. 

357 

358 """ 

359 with open(file_path, 'rb') as f: 

360 content = f.read() 

361 

362 content_type = mimetypes.guess_type(file_path)[0] 

363 if not content_type: 

364 content_type = "application/octet-stream" 

365 return Attachment({'Content': base64.b64encode(content).decode('utf-8'), 

366 'ContentType': content_type, 

367 'Filename': os.path.basename(file_path)}) 

368 

369 def save_to_dir(self, folder="/tmp"): 

370 """save Attachment to a folder on disc 

371 

372 Args: 

373 folder (str): The directory where this attachment should be saved to. 

374 

375 Returns: 

376 **bool**: True 

377 

378 """ 

379 if not hasattr(self, 'Content') or not hasattr(self, 'Filename'): 

380 raise ValueError("invalid Attachment") 

381 

382 file_path = os.path.join(os.path.abspath(folder), self.Filename) 

383 with open(file_path, 'wb') as f: 

384 f.write(base64.b64decode(self.Content)) 

385 

386 return True 

387 

388 @classmethod 

389 def _dummy(cls): 

390 """dummy data (for testing) 

391 

392 Returns: 

393 **Attachment**: An Attachment object. 

394 

395 """ 

396 return Attachment.create_basic("YmFyCg==", "text/plain", "dümmy.txt") 

397 

398 

399class DynamicField: 

400 """PyOTRS DynamicField class 

401 

402 Args: 

403 name (str): Name of OTRS DynamicField (required) 

404 value (str): Value of OTRS DynamicField 

405 search_operator (str): Search operator (defaults to: "Equals") 

406 Valid options are: 

407 "Equals", "Like", "GreaterThan", "GreaterThanEquals", 

408 "SmallerThan", "SmallerThanEquals" 

409 search_patterns (list): List of patterns (str or datetime) to search for 

410 

411 .. warning:: 

412 **PyOTRS only supports OTRS 5 style!** 

413 DynamicField representation changed between OTRS 4 and OTRS 5. 

414 

415 """ 

416 

417 SEARCH_OPERATORS = ("Equals", "Like", "GreaterThan", "GreaterThanEquals", 

418 "SmallerThan", "SmallerThanEquals",) 

419 

420 def __init__(self, name, value=None, search_patterns=None, search_operator="Equals"): 

421 self.name = name 

422 self.value = value 

423 

424 if not isinstance(search_patterns, list): 

425 self.search_patterns = [search_patterns] 

426 else: 

427 self.search_patterns = search_patterns 

428 

429 if search_operator not in DynamicField.SEARCH_OPERATORS: 

430 raise NotImplementedError(f"Invalid Operator: \"{search_operator}\"") 

431 self.search_operator = search_operator 

432 

433 def __repr__(self): 

434 return f"<{self.__class__.__name__}: {self.name}: {self.value}>" 

435 

436 @classmethod 

437 def from_dct(cls, dct): 

438 """create DynamicField from dct 

439 

440 Args: 

441 dct (dict): 

442 

443 Returns: 

444 **DynamicField**: A DynamicField object. 

445 

446 """ 

447 return cls(name=dct["Name"], value=dct["Value"]) 

448 

449 def to_dct(self): 

450 """represent DynamicField as dict 

451 

452 Returns: 

453 **dict**: DynamicField as dict. 

454 

455 """ 

456 return {"Name": self.name, "Value": self.value} 

457 

458 def to_dct_search(self): 

459 """represent DynamicField as dict for search operations 

460 

461 Returns: 

462 **dict**: DynamicField as dict for search operations 

463 

464 """ 

465 _lst = [] 

466 for item in self.search_patterns: 

467 if isinstance(item, datetime.datetime): 

468 item = item.strftime("%Y-%m-%d %H:%M:%S") 

469 if self.search_operator == 'Like' and '*' not in item: 

470 item = f"*{item}*" 

471 _lst.append(item) 

472 

473 return {f"DynamicField_{self.name}": {self.search_operator: _lst}} 

474 

475 @classmethod 

476 def _dummy1(cls): 

477 """dummy1 data (for testing) 

478 

479 Returns: 

480 **DynamicField**: A list of DynamicField objects. 

481 

482 """ 

483 return DynamicField(name="firstname", value="Jane") 

484 

485 @classmethod 

486 def _dummy2(cls): 

487 """dummy2 data (for testing) 

488 

489 Returns: 

490 **DynamicField**: A list of DynamicField objects. 

491 

492 """ 

493 return DynamicField.from_dct({'Name': 'lastname', 'Value': 'Doe'}) 

494 

495 

496class Ticket: 

497 """PyOTRS Ticket class 

498 

499 Args: 

500 tid (int): OTRS Ticket ID as integer 

501 fields (dict): OTRS Top Level fields 

502 articles (list): List of Article objects 

503 dynamic_fields (list): List of DynamicField objects 

504 

505 """ 

506 

507 def __init__(self, dct): 

508 # store OTRS Top Level fields 

509 self.fields = {} 

510 self.fields.update(dct) 

511 

512 self.tid = int(self.fields.get("TicketID", 0)) 

513 self.articles = self._parse_articles() 

514 self.fields.pop("Article", None) 

515 

516 self.dynamic_fields = self._parse_dynamic_fields() 

517 self.fields.pop("DynamicField", None) 

518 

519 def __repr__(self): 

520 if self.tid: 

521 return f"<{self.__class__.__name__}: {self.tid}>" 

522 else: 

523 return f"<{self.__class__.__name__}>" 

524 

525 def _parse_articles(self): 

526 """parse Article from Ticket and return as **list** of **Article** objects""" 

527 lst = self.fields.get("Article", []) 

528 return [Article(item) for item in lst] 

529 

530 def _parse_dynamic_fields(self): 

531 """parse DynamicField from Ticket and return as **list** of **DynamicField** objects""" 

532 lst = self.fields.get("DynamicField", []) 

533 return [DynamicField.from_dct(item) for item in lst] 

534 

535 def to_dct(self, 

536 articles=True, 

537 article_attachments=True, 

538 article_attachment_cont=True, 

539 article_dynamic_fields=True, 

540 dynamic_fields=True): 

541 """represent as nested dict 

542 

543 Args: 

544 articles (bool): if True will include, otherwise exclude: 

545 "Article" (default: True) 

546 article_attachments (bool): if True will include, otherwise exclude: 

547 "Article" > "Attachment" (default: True) 

548 article_attachment_cont (bool): if True will include, otherwise exclude: 

549 "Article" > "Attachment" > "Content" (default: True) 

550 article_dynamic_fields (bool): if True will include, otherwise exclude: 

551 "Article" > "DynamicField" (default: True) 

552 dynamic_fields (bool): if True will include, otherwise exclude: 

553 "DynamicField" (default: True) 

554 

555 Returns: 

556 **dict**: Ticket represented as dict. 

557 

558 .. note:: 

559 Does not contain Articles or DynamicFields (currently) 

560 

561 """ 

562 dct = {} 

563 dct.update(self.fields) 

564 

565 if articles: 

566 try: 

567 if self.articles: 

568 dct.update({"Article": [x.to_dct(attachments=article_attachments, 

569 attachment_cont=article_attachment_cont, 

570 dynamic_fields=article_dynamic_fields) 

571 for x in self.articles]}) 

572 except AttributeError: 

573 pass 

574 

575 if dynamic_fields: 

576 try: 

577 if self.dynamic_fields: 

578 dct.update({"DynamicField": [x.to_dct() for x in self.dynamic_fields]}) 

579 except AttributeError: 

580 pass 

581 

582 return {"Ticket": dct} 

583 

584 def article_get(self, aid): 

585 """article_get 

586 

587 Args: 

588 aid (str): Article ID as either int or str 

589 

590 Returns: 

591 **Article** or **None** 

592 

593 """ 

594 result = [x for x in self.articles if x.field_get("ArticleID") == str(aid)] 

595 return result[0] if result else None 

596 

597 def dynamic_field_get(self, df_name): 

598 """dynamic_field_get 

599 

600 Args: 

601 df_name (str): Name of DynamicField to retrieve 

602 

603 Returns: 

604 **DynamicField** or **None** 

605 

606 """ 

607 result = [x for x in self.dynamic_fields if x.name == df_name] 

608 return result[0] if result else None 

609 

610 def field_get(self, f_name): 

611 return self.fields.get(f_name) 

612 

613 def field_add(self, fields=None, **kwargs): 

614 """add fields to ticket 

615 

616 Provide a dictionary of key:value pairs to be added to the ticket, 

617 e.g. the basic ticket. 

618 

619 Use it like `field_add({"SLA": "1h", "Service": "Ticket-Service"})` or via kwargs 

620 by providing key=value pairs: `field_add(SLA="1h", Service="Ticket-Service")`. 

621 

622 Args: 

623 fields (dict): Dictionary of new fields to be added to the 

624 Ticket instance. 

625 

626 Returns: 

627 **dict**: The current fields attribute of the ticket. 

628 

629 """ 

630 if isinstance(fields, dict): 

631 self.fields.update(fields) 

632 

633 if kwargs and isinstance(kwargs, dict): 

634 self.fields.update(kwargs) 

635 

636 return self.fields 

637 

638 @classmethod 

639 def create_basic(cls, 

640 Title=None, # noqa: N803 

641 QueueID=None, # noqa: N803 

642 Queue=None, # noqa: N803 

643 TypeID=None, # noqa: N803 

644 Type=None, # noqa: N803 

645 StateID=None, # noqa: N803 

646 State=None, # noqa: N803 

647 PriorityID=None, # noqa: N803 

648 Priority=None, # noqa: N803 

649 CustomerUser=None): # noqa: N803 

650 """create basic ticket 

651 

652 Args: 

653 Title (str): OTRS Ticket Title 

654 QueueID (str): OTRS Ticket QueueID (e.g. "1") 

655 Queue (str): OTRS Ticket Queue (e.g. "raw") 

656 TypeID (str): OTRS Ticket TypeID (e.g. "1") 

657 Type (str): OTRS Ticket Type (e.g. "Problem") 

658 StateID (str): OTRS Ticket StateID (e.g. "1") 

659 State (str): OTRS Ticket State (e.g. "open" or "new") 

660 PriorityID (str): OTRS Ticket PriorityID (e.g. "1") 

661 Priority (str): OTRS Ticket Priority (e.g. "low") 

662 CustomerUser (str): OTRS Ticket CustomerUser 

663 

664 Returns: 

665 **Ticket**: A new Ticket object. 

666 

667 """ 

668 if not Title: 

669 raise ArgumentMissingError("Title is required") 

670 

671 if not Queue and not QueueID: 

672 raise ArgumentMissingError("Either Queue or QueueID required") 

673 

674 if not State and not StateID: 

675 raise ArgumentMissingError("Either State or StateID required") 

676 

677 if not Priority and not PriorityID: 

678 raise ArgumentMissingError("Either Priority or PriorityID required") 

679 

680 if not CustomerUser: 

681 raise ArgumentMissingError("CustomerUser is required") 

682 

683 if Type and TypeID: 

684 raise ArgumentInvalidError("Either Type or TypeID - not both") 

685 

686 dct = {"Title": Title} 

687 

688 if Queue: 

689 dct.update({"Queue": Queue}) 

690 else: 

691 dct.update({"QueueID": QueueID}) 

692 

693 if Type: 

694 dct.update({"Type": Type}) 

695 if TypeID: 

696 dct.update({"TypeID": TypeID}) 

697 

698 if State: 

699 dct.update({"State": State}) 

700 else: 

701 dct.update({"StateID": StateID}) 

702 

703 if Priority: 

704 dct.update({"Priority": Priority}) 

705 else: 

706 dct.update({"PriorityID": PriorityID}) 

707 

708 dct.update({"CustomerUser": CustomerUser}) 

709 

710 for key, value in dct.items(): 

711 dct.update({key: value}) 

712 

713 return Ticket(dct) 

714 

715 @classmethod 

716 def _dummy(cls): 

717 """dummy data (for testing) 

718 

719 Returns: 

720 **Ticket**: A Ticket object. 

721 

722 """ 

723 return Ticket.create_basic(Queue="Raw", 

724 State="open", 

725 Priority="3 normal", 

726 CustomerUser="root@localhost", 

727 Title="Bäsic Ticket") 

728 

729 @staticmethod 

730 def datetime_to_pending_time_text(datetime_object=None): 

731 """datetime_to_pending_time_text 

732 

733 Args: 

734 datetime_object (Datetime) 

735 

736 Returns: 

737 **str**: The pending time in the format required for OTRS REST interface. 

738 

739 """ 

740 return { 

741 "Year": datetime_object.year, 

742 "Month": datetime_object.month, 

743 "Day": datetime_object.day, 

744 "Hour": datetime_object.hour, 

745 "Minute": datetime_object.minute 

746 } 

747 

748 

749class SessionStore: 

750 """Session ID: persistently store to and retrieve from to file 

751 

752 Args: 

753 file_path (str): Path on disc 

754 session_timeout (int): OTRS Session Timeout Value (to avoid reusing outdated session id 

755 value (str): A Session ID as str 

756 created (int): seconds as epoch when a session_id record was created 

757 expires (int): seconds as epoch when a session_id record expires 

758 is_legacy (bool): whether the Session ID is for an older OTRS Version (<=7) 

759 

760 Raises: 

761 ArgumentMissingError 

762 

763 """ 

764 

765 def __init__(self, file_path=None, session_timeout=None, 

766 value=None, created=None, expires=None, is_legacy=False): 

767 if not file_path: 

768 raise ArgumentMissingError("Argument file_path is required!") 

769 

770 if not session_timeout: 

771 raise ArgumentMissingError("Argument session_timeout is required!") 

772 

773 self.file_path = file_path 

774 self.timeout = session_timeout 

775 self.value = value 

776 self.created = created 

777 self.expires = expires 

778 self.is_legacy = is_legacy 

779 

780 def __repr__(self): 

781 return f"<{self.__class__.__name__}: {self.file_path}>" 

782 

783 def read(self): 

784 """Retrieve a stored Session ID from file 

785 

786 Returns: 

787 **str** or **None**: Retrieved Session ID or None (if none could be read) 

788 

789 """ 

790 if not os.path.isfile(self.file_path): 

791 return None 

792 

793 if not SessionStore._validate_file_owner_and_permissions(self.file_path): 

794 return None 

795 

796 with open(self.file_path) as f: 

797 content = f.read() 

798 try: 

799 data = json.loads(content) 

800 self.value = data['session_id'] 

801 self.is_legacy = data.get('is_legacy', False) 

802 

803 self.created = datetime.datetime.utcfromtimestamp(int(data['created'])) 

804 self.expires = (self.created + datetime.timedelta(seconds=self.timeout)) 

805 

806 if self.expires > datetime.datetime.utcnow(): 

807 return self.value # still valid 

808 except ValueError: 

809 return None 

810 except KeyError: 

811 return None 

812 except Exception as err: 

813 raise Exception(f"Exception Type: {type(err)}: {err}") 

814 

815 def write(self, new_value): 

816 """Write and store a Session ID to file (rw for user only) 

817 

818 Args: 

819 new_value (str): if none then empty value will be writen to file 

820 Returns: 

821 **bool**: **True** if successful, False **otherwise**. 

822 

823 """ 

824 self.value = new_value 

825 

826 if os.path.isfile(self.file_path): 

827 if not SessionStore._validate_file_owner_and_permissions(self.file_path): 

828 raise OSError("File exists but is not ok (wrong owner/permissions)!") 

829 

830 with open(self.file_path, 'w') as f: 

831 f.write(json.dumps({'created': str(int(time.time())), 

832 'session_id': self.value, 

833 'is_legacy': self.is_legacy})) 

834 os.chmod(self.file_path, 384) # 384 is '0600' 

835 

836 # TODO 2016-04-23 (RH): check this 

837 if not SessionStore._validate_file_owner_and_permissions(self.file_path): 

838 raise OSError("Race condition: Something happened to file during the run!") 

839 

840 return True 

841 

842 def delete(self): 

843 """remove session id file (e.g. when it only contains an invalid session id) 

844 

845 Raises: 

846 NotImplementedError 

847 

848 Returns: 

849 **bool**: **True** if successful, otherwise **False**. 

850 

851 .. todo:: 

852 (RH) implement this _remove_session_id_file 

853 """ 

854 raise NotImplementedError("Not yet done") 

855 

856 @staticmethod 

857 def _validate_file_owner_and_permissions(full_file_path): 

858 """validate SessionStore file ownership and permissions 

859 

860 Args: 

861 full_file_path (str): full path to file on disc 

862 

863 Returns: 

864 **bool**: **True** if valid and correct (or running on Windows), otherwise **False**... 

865 

866 """ 

867 if not os.path.isfile(full_file_path): 

868 raise OSError(f"Does not exist or not a file: {full_file_path}") 

869 

870 if OS_TYPE_WINDOWS: # We have to do this, as os.getuid() is not defined if run on Windows 

871 return True 

872 

873 file_lstat = os.lstat(full_file_path) 

874 if not file_lstat.st_uid == os.getuid(): 

875 return False 

876 

877 if not file_lstat.st_mode & 0o777 == 384: 

878 """ check for unix permission User R+W only (0600) 

879 >>> oct(384) 

880 '0600' Python 2 

881 >>> oct(384) 

882 '0o600' Python 3 """ 

883 return False 

884 

885 return True 

886 

887 

888class Client: 

889 """PyOTRS Client class - includes Session handling 

890 

891 Args: 

892 baseurl (str): Base URL for OTRS System, no trailing slash e.g. http://otrs.example.com 

893 username (str): Username 

894 password (str): Password 

895 session_id_file (str): Session ID path on disc, used to persistently store Session ID 

896 session_timeout (int): Session Timeout configured in OTRS (usually 28800 seconds = 8h) 

897 session_validation_ticket_id (int): Ticket ID of an existing ticket - used to perform 

898 several check - e.g. validate log in (defaults to 1) 

899 webservice_config_ticket (dict): OTRS REST Web Service Name - Ticket Connector 

900 webservice_config_faq (dict): OTRS REST Web Service Name - FAQ Connector (FAQ code was 

901 deprecated in 0.4.0 and removed in 1.0.0 - parameter stays for compatibility) 

902 webservice_config_link (dict): OTRS REST Web Service Name - Link Connector 

903 proxies (dict): Proxy settings - refer to requests docs for 

904 more information - default to no proxies 

905 https_verify (bool): Should HTTPS certificates be verified (defaults to True) 

906 ca_cert_bundle (str): file path - if specified overrides python/system default for 

907 Root CA bundle that will be used. 

908 auth (tuple): e.g. ("user", "pass") - see requests documentation ("auth") for details 

909 client_auth_cert (str): file path containing both certificate and key (unencrypted) in 

910 PEM format to use for TLS client authentication (passed to requests as "cert") 

911 customer_user (bool): flag to indicate that the username is for a "CustomerUser" 

912 (defaults to False) 

913 user_agent (str): optional HTTP UserAgent string 

914 webservice_path (str): OTRS REST Web Service Path part - defaults to 

915 "/otrs/nph-genericinterface.pl/Webservice/" 

916 use_legacy_sessions (bool): if True, use sessions compatibility for OTRS < V8 

917 request_timeout (float or tuple): optional How many seconds to wait for the server 

918 to send data before giving up, as a float, or a (connect timeout, read timeout) tuple 

919 

920 """ 

921 

922 def __init__(self, 

923 baseurl=None, 

924 username=None, 

925 password=None, 

926 session_id_file=None, 

927 session_timeout=None, 

928 session_validation_ticket_id=1, 

929 webservice_config_ticket=None, 

930 webservice_config_faq=None, 

931 webservice_config_link=None, 

932 proxies=None, 

933 https_verify=True, 

934 ca_cert_bundle=None, 

935 auth=None, 

936 client_auth_cert=None, 

937 customer_user=False, 

938 user_agent=None, 

939 webservice_path="/otrs/nph-genericinterface.pl/Webservice/", 

940 use_legacy_sessions=False, 

941 request_timeout=None 

942 ): 

943 

944 if not baseurl: 

945 raise ArgumentMissingError("baseurl") 

946 self.baseurl = baseurl.rstrip("/") 

947 self.webservice_path = webservice_path 

948 

949 if not session_timeout: 

950 self.session_timeout = 28800 # 8 hours is OTRS default 

951 else: 

952 self.session_timeout = session_timeout 

953 

954 if not session_id_file: 

955 if OS_TYPE_WINDOWS: 

956 # No '/tmp/...' available on Windows OS, so using in Windows Temp folder instead 

957 p = os.path.expanduser('~\\AppData\\Local\\Temp\\.pyotrs_session_id') 

958 session_id_path = p 

959 else: 

960 session_id_path = "/tmp/.pyotrs_session_id" # Default for Linux 

961 

962 self.session_id_store = SessionStore(file_path=session_id_path, 

963 session_timeout=self.session_timeout, 

964 is_legacy=use_legacy_sessions) 

965 else: 

966 self.session_id_store = SessionStore(file_path=session_id_file, 

967 session_timeout=self.session_timeout, 

968 is_legacy=use_legacy_sessions) 

969 

970 self.session_validation_ticket_id = session_validation_ticket_id 

971 

972 # A dictionary for mapping OTRS WebService operations to HTTP Method, Route and 

973 # Result string. 

974 if not webservice_config_ticket: 

975 webservice_config_ticket = TICKET_CONNECTOR_CONFIG_DEFAULT 

976 

977 if not webservice_config_link: 

978 webservice_config_link = LINK_CONNECTOR_CONFIG_DEFAULT 

979 

980 self.ws_ticket = webservice_config_ticket['Name'] 

981 self.ws_link = webservice_config_link['Name'] 

982 

983 self.routes_ticket = [x[1]["Route"] for x in webservice_config_ticket['Config'].items()] 

984 self.routes_link = [x[1]["Route"] for x in webservice_config_link['Config'].items()] 

985 

986 webservice_config = {} 

987 webservice_config.update(webservice_config_ticket['Config']) 

988 webservice_config.update(webservice_config_link['Config']) 

989 self.ws_config = webservice_config 

990 

991 if not proxies: 

992 self.proxies = {"http": "", "https": "", "no": ""} 

993 else: 

994 if not isinstance(proxies, dict): 

995 raise ValueError("Proxy settings need to be provided as dict!") 

996 self.proxies = proxies 

997 

998 if https_verify: 

999 if not ca_cert_bundle: 

1000 self.https_verify = https_verify 

1001 else: 

1002 ca_certs = os.path.abspath(ca_cert_bundle) 

1003 if not os.path.isfile(ca_certs): 

1004 raise ValueError(f"Certificate file does not exist: {ca_certs}") 

1005 self.https_verify = ca_certs 

1006 else: 

1007 self.https_verify = False 

1008 

1009 self.auth = auth 

1010 self.client_auth_cert = client_auth_cert 

1011 self.request_timeout = request_timeout 

1012 

1013 self.customer_user = customer_user 

1014 

1015 self.user_agent = user_agent 

1016 

1017 self.use_legacy_sessions = use_legacy_sessions 

1018 

1019 # credentials 

1020 self.username = username 

1021 self.password = password 

1022 

1023 # dummy initialization 

1024 self.operation = None 

1025 self.result_json = None 

1026 self.result = [] 

1027 

1028 """ 

1029 Returns the correct session key to look for/send based 

1030 on the current session compatibility level 

1031 for legacy sessions (< OTRS V8) this returns 'SessionID' 

1032 from V8 onwards this returns 'AccessToken' 

1033 """ 

1034 

1035 @property 

1036 def _session_key(self): 

1037 if self.use_legacy_sessions: 

1038 return 'SessionID' 

1039 else: 

1040 return 'AccessToken' 

1041 

1042 """ 

1043 GenericInterface::Operation::Session::SessionCreate 

1044 * session_check_is_valid 

1045 * session_create 

1046 * session_get 

1047 * session_restore_or_set_up_new # try to get session_id from a (json) file on disc 

1048 """ 

1049 

1050 @deprecation.deprecated(deprecated_in="0.10.0", removed_in="2.0", current_version=__version__, 

1051 details="This method was implemented with a \"dirty\" workaround and " 

1052 "should not be called anymore. Use Client.session_get() " 

1053 "instead. ATTENTION: Using session_get() requires the OTRS " 

1054 "route for HTTP GET /Session to be configured (which is/was " 

1055 "not the default).") 

1056 def session_check_is_valid(self, session_id=None): 

1057 """check whether session_id is currently valid 

1058 

1059 Args: 

1060 session_id (str): optional if set overrides the self.session_id 

1061 

1062 Raises: 

1063 ArgumentMissingError: if session_id is not set 

1064 

1065 Returns: 

1066 **bool**: **True** if valid, otherwise **False**. 

1067 

1068 .. note:: 

1069 Uses HTTP Method: GET 

1070 """ 

1071 self.operation = "TicketGet" 

1072 

1073 if not session_id: 

1074 raise ArgumentMissingError("session_id") 

1075 

1076 # TODO 2016-04-13 (RH): Is there a nicer way to check whether session is valid?! 

1077 payload = {self._session_key: session_id} 

1078 

1079 response = self._send_request(payload, data_id=self.session_validation_ticket_id) 

1080 return self._parse_and_validate_response(response) 

1081 

1082 def session_create(self): 

1083 """create new (temporary) session (and Session ID) 

1084 

1085 Returns: 

1086 **bool**: **True** if successful, otherwise **False**. 

1087 

1088 .. note:: 

1089 Session ID is recorded in self.session_id_store.value (**non persistently**) 

1090 

1091 .. note:: 

1092 Uses HTTP Method: POST 

1093 

1094 """ 

1095 if self.use_legacy_sessions: 

1096 self.operation = "SessionCreate" 

1097 else: 

1098 self.operation = "AccessTokenCreate" 

1099 

1100 if self.customer_user: 

1101 payload = { 

1102 "CustomerUserLogin": self.username, 

1103 "Password": self.password 

1104 } 

1105 else: 

1106 payload = { 

1107 "UserLogin": self.username, 

1108 "Password": self.password 

1109 } 

1110 

1111 response = self._send_request(payload) 

1112 try: 

1113 if not self._parse_and_validate_response(response): 

1114 return False 

1115 except ResponseParseError: 

1116 if not self.use_legacy_sessions: 

1117 log.warning("AccessTokenCreate failed, retrying using legacy session") 

1118 self.use_legacy_sessions = True 

1119 self.session_id_store.is_legacy = True 

1120 return self.session_create() 

1121 return False 

1122 

1123 self.session_id_store.value = self.result_json[self._session_key] 

1124 return True 

1125 

1126 def session_get(self, session_id=None): 

1127 """get/check/validate a Session ID 

1128 

1129 Returns: 

1130 **bool**: **True** if successful, otherwise **False**. 

1131 

1132 .. note:: 

1133 Uses HTTP Method: GET 

1134 

1135 """ 

1136 self.operation = "SessionGet" 

1137 

1138 if not session_id: 

1139 raise ArgumentMissingError("session_id") 

1140 

1141 payload = {self._session_key: session_id} 

1142 

1143 response = self._send_request(payload, data_id=session_id) 

1144 return self._parse_and_validate_response(response) 

1145 

1146 def session_restore_or_create(self): 

1147 """Try to restore Session ID from file otherwise create new one and save to file 

1148 

1149 Raises: 

1150 SessionCreateError 

1151 SessionIDFileError 

1152 

1153 .. note:: 

1154 Session ID is recorded in self.session_id_store.value (**non persistently**) 

1155 

1156 .. note:: 

1157 Session ID is **saved persistently** to file: *self.session_id_store.file_path* 

1158 

1159 Returns: 

1160 **bool**: **True** if successful, otherwise **False**. 

1161 """ 

1162 # try to read session_id from file 

1163 self.session_id_store.value = self.session_id_store.read() 

1164 

1165 if self.session_id_store.value: 

1166 # got one.. use stored is_legacy status info 

1167 self.use_legacy_sessions = self.session_id_store.is_legacy 

1168 # and the check whether it's still valid 

1169 if self.session_get(self.session_id_store.value): 

1170 log.info("Using valid Session ID " 

1171 f"from ({self.session_id_store.file_path})") 

1172 return True 

1173 

1174 # got no (valid) session_id; clean store 

1175 self.session_id_store.write("") 

1176 

1177 # and try to create new one 

1178 if not self.session_create(): 

1179 raise SessionCreateError("Failed to create a Session ID!") 

1180 

1181 # save new created session_id to file 

1182 if not self.session_id_store.write(self.result_json[self._session_key]): 

1183 raise OSError("Failed to save Session ID to file!") 

1184 else: 

1185 log.info("Saved new Session ID to file: " 

1186 f"{self.session_id_store.file_path}") 

1187 return True 

1188 

1189 @deprecation.deprecated(deprecated_in="0.10.0", removed_in="2.0", current_version=__version__, 

1190 details="This method uses session_check_is_valid which was " 

1191 "implemented with a \"dirty\" workaround and should not be " 

1192 "called anymore. Use Client.session_restore_or_create() " 

1193 "instead. ATTENTION: Using that also switches to " 

1194 "session_get() which requires the OTRS route for HTTP GET " 

1195 "/Session to be configured (which is/was not the default).") 

1196 def session_restore_or_set_up_new(self): 

1197 """Try to restore Session ID from file otherwise create new one and save to file 

1198 

1199 Raises: 

1200 SessionCreateError 

1201 SessionIDFileError 

1202 

1203 .. note:: 

1204 Session ID is recorded in self.session_id_store.value (**non persistently**) 

1205 

1206 .. note:: 

1207 Session ID is **saved persistently** to file: *self.session_id_store.file_path* 

1208 

1209 Returns: 

1210 **bool**: **True** if successful, otherwise **False**. 

1211 """ 

1212 # try to read session_id from file 

1213 self.session_id_store.value = self.session_id_store.read() 

1214 

1215 if self.session_id_store.value: 

1216 # got one.. check whether it's still valid 

1217 try: 

1218 if self.session_check_is_valid(self.session_id_store.value): 

1219 log.info("Using valid Session ID " 

1220 f"from ({self.session_id_store.file_path})") 

1221 return True 

1222 except APIError: 

1223 """Most likely invalid session_id. Remove it from session_id_store.""" 

1224 pass 

1225 

1226 # got no (valid) session_id; clean store 

1227 self.session_id_store.write("") 

1228 

1229 # and try to create new one 

1230 if not self.session_create(): 

1231 raise SessionCreateError("Failed to create a Session ID!") 

1232 

1233 # save new created session_id to file 

1234 if not self.session_id_store.write(self.result_json[self._session_key]): 

1235 raise OSError("Failed to save Session ID to file!") 

1236 else: 

1237 log.info("Saved new Session ID to file: " 

1238 f"{self.session_id_store.file_path}") 

1239 return True 

1240 

1241 """ 

1242 GenericInterface::Operation::Ticket::TicketCreate 

1243 * ticket_create 

1244 """ 

1245 

1246 def ticket_create(self, 

1247 ticket=None, 

1248 article=None, 

1249 attachments=None, 

1250 dynamic_fields=None, 

1251 **kwargs): 

1252 """Create a Ticket 

1253 

1254 Args: 

1255 ticket (Ticket): a ticket object 

1256 article (Article): optional article 

1257 attachments (list): *Attachment* objects 

1258 dynamic_fields (list): *DynamicField* object 

1259 **kwargs: any regular OTRS Fields (not for Dynamic Fields!) 

1260 

1261 Returns: 

1262 **dict** or **False**: dict if successful, otherwise **False**. 

1263 """ 

1264 if not self.session_id_store.value: 

1265 raise SessionNotCreated("Call session_create() or " 

1266 "session_restore_or_create() first") 

1267 self.operation = "TicketCreate" 

1268 

1269 payload = {self._session_key: self.session_id_store.value} 

1270 

1271 if not ticket: 

1272 raise ArgumentMissingError("Ticket") 

1273 

1274 if not article: 

1275 raise ArgumentMissingError("Article") 

1276 

1277 if isinstance(kwargs, dict) and len(kwargs) > 0: 

1278 ticket.field_add(kwargs) 

1279 

1280 payload.update(ticket.to_dct()) 

1281 

1282 if article: 

1283 article.validate() 

1284 payload.update({"Article": article.to_dct()}) 

1285 

1286 if attachments: 

1287 # noinspection PyTypeChecker 

1288 payload.update({"Attachment": [att.to_dct() for att in attachments]}) 

1289 

1290 if dynamic_fields: 

1291 # noinspection PyTypeChecker 

1292 payload.update({"DynamicField": [df.to_dct() for df in dynamic_fields]}) 

1293 

1294 if not self._parse_and_validate_response(self._send_request(payload)): 

1295 return False 

1296 else: 

1297 return self.result_json 

1298 

1299 """ 

1300 GenericInterface::Operation::Ticket::TicketGet 

1301 

1302 * ticket_get_by_id 

1303 * ticket_get_by_list 

1304 * ticket_get_by_number 

1305 """ 

1306 

1307 def ticket_get_by_id(self, 

1308 ticket_id, 

1309 articles=False, 

1310 attachments=False, 

1311 dynamic_fields=True, 

1312 html_body_as_attachment=False): 

1313 """ticket_get_by_id 

1314 

1315 Args: 

1316 ticket_id (int): Integer value of a Ticket ID 

1317 attachments (bool): will request OTRS to include attachments (*default: False*) 

1318 articles (bool): will request OTRS to include all 

1319 Articles (*default: False*) 

1320 dynamic_fields (bool): will request OTRS to include all 

1321 Dynamic Fields (*default: True*) 

1322 html_body_as_attachment (bool): Optional, If enabled the HTML body version of 

1323 each article is added to the attachments list 

1324 

1325 Returns: 

1326 **Ticket** or **False**: Ticket object if successful, otherwise **False**. 

1327 

1328 """ 

1329 if not self.session_id_store.value: 

1330 raise SessionNotCreated("Call session_create() or " 

1331 "session_restore_or_create() first") 

1332 self.operation = "TicketGet" 

1333 

1334 payload = { 

1335 self._session_key: self.session_id_store.value, 

1336 "TicketID": f"{ticket_id}", 

1337 "AllArticles": int(articles), 

1338 "Attachments": int(attachments), 

1339 "DynamicFields": int(dynamic_fields), 

1340 "HTMLBodyAsAttachment": int(html_body_as_attachment), 

1341 } 

1342 

1343 response = self._send_request(payload, ticket_id) 

1344 if not self._parse_and_validate_response(response): 

1345 return False 

1346 else: 

1347 return self.result[0] 

1348 

1349 def ticket_get_by_list(self, 

1350 ticket_id_list, 

1351 articles=False, 

1352 attachments=False, 

1353 dynamic_fields=True, 

1354 html_body_as_attachment=False): 

1355 """ticket_get_by_list 

1356 

1357 Args: 

1358 ticket_id_list (list): List of either String or Integer values 

1359 attachments (bool): will request OTRS to include attachments (*default: False*) 

1360 articles (bool): will request OTRS to include all 

1361 Articles (*default: False*) 

1362 dynamic_fields (bool): will request OTRS to include all 

1363 Dynamic Fields (*default: True*) 

1364 html_body_as_attachment (bool): Optional, If enabled the HTML body version of 

1365 each article is added to the attachments list 

1366 

1367 Returns: 

1368 **list**: Ticket objects (as list) if successful, otherwise **False**. 

1369 

1370 """ 

1371 if not self.session_id_store.value: 

1372 raise SessionNotCreated("Call session_create() or " 

1373 "session_restore_or_create() first") 

1374 self.operation = "TicketGetList" 

1375 

1376 if not isinstance(ticket_id_list, list): 

1377 raise ArgumentInvalidError("Please provide list of IDs!") 

1378 

1379 # When you ask with an empty ticket_id_list, you get an empty response 

1380 if not ticket_id_list: 

1381 return [] 

1382 

1383 payload = { 

1384 self._session_key: self.session_id_store.value, 

1385 "TicketID": ','.join([str(item) for item in ticket_id_list]), 

1386 "AllArticles": int(articles), 

1387 "Attachments": int(attachments), 

1388 "DynamicFields": int(dynamic_fields), 

1389 "HTMLBodyAsAttachment": int(html_body_as_attachment), 

1390 } 

1391 

1392 if not self._parse_and_validate_response(self._send_request(payload)): 

1393 return False 

1394 else: 

1395 return self.result 

1396 

1397 def ticket_get_by_number(self, 

1398 ticket_number, 

1399 articles=False, 

1400 attachments=False, 

1401 dynamic_fields=True, 

1402 html_body_as_attachment=False): 

1403 """ticket_get_by_number 

1404 

1405 Args: 

1406 ticket_number (str): Ticket Number as str 

1407 attachments (bool): will request OTRS to include attachments (*default: False*) 

1408 articles (bool): will request OTRS to include all 

1409 Articles (*default: False*) 

1410 dynamic_fields (bool): will request OTRS to include all 

1411 Dynamic Fields (*default: True*) 

1412 html_body_as_attachment (bool): Optional, If enabled the HTML body version of 

1413 each article is added to the attachments list 

1414 

1415 Raises: 

1416 ValueError 

1417 

1418 Returns: 

1419 **Ticket** or **False**: Ticket object if successful, otherwise **False**. 

1420 

1421 """ 

1422 if isinstance(ticket_number, int): 

1423 raise ArgumentInvalidError("Provide ticket_number as str/unicode. " 

1424 "Got ticket_number as int.") 

1425 result_list = self.ticket_search(TicketNumber=ticket_number) 

1426 

1427 if not result_list: 

1428 return False 

1429 

1430 if len(result_list) == 1: 

1431 result = self.ticket_get_by_id(result_list[0], 

1432 articles=articles, 

1433 attachments=attachments, 

1434 dynamic_fields=dynamic_fields, 

1435 html_body_as_attachment=html_body_as_attachment) 

1436 if not result: 

1437 return False 

1438 else: 

1439 return result 

1440 else: 

1441 # TODO 2016-11-12 (RH): more than one ticket found for a specific ticket number 

1442 raise ValueError("Found more than one result for " 

1443 f"Ticket Number: {ticket_number}") 

1444 

1445 """ 

1446 GenericInterface::Operation::Ticket::TicketHistoryGet 

1447 

1448 * ticket_history_get_by_id 

1449 """ 

1450 

1451 def ticket_history_get_by_id(self, ticket_id): 

1452 """ticket_history_get_by_id 

1453 

1454 Args: 

1455 ticket_id (int): Integer value of a Ticket ID 

1456 

1457 Returns: 

1458 **dict** or **False**: A dict("History") containing a list of dicts, otherwise **False**. 

1459 

1460 """ 

1461 if not self.session_id_store.value: 

1462 raise SessionNotCreated("Call session_create() or " 

1463 "session_restore_or_create() first") 

1464 self.operation = "TicketHistoryGet" 

1465 

1466 payload = { 

1467 self._session_key: self.session_id_store.value, 

1468 "TicketID": f"{ticket_id}" 

1469 } 

1470 

1471 response = self._send_request(payload, ticket_id) 

1472 if not self._parse_and_validate_response(response): 

1473 return False 

1474 else: 

1475 return self.result[0] 

1476 

1477 """ 

1478 GenericInterface::Operation::Ticket::TicketSearch 

1479 * ticket_search 

1480 * ticket_search_full_text 

1481 """ 

1482 

1483 def ticket_search(self, dynamic_fields=None, **kwargs): 

1484 """Search for ticket 

1485 

1486 Args: 

1487 dynamic_fields (list): List of DynamicField objects for which the search 

1488 should be performed 

1489 **kwargs: Arbitrary keyword arguments (not for DynamicField objects). 

1490 

1491 Returns: 

1492 **list** or **False**: The search result (as list) if successful (can be an 

1493 empty list: []), otherwise **False**. 

1494 

1495 .. note:: 

1496 If value of kwargs is a datetime object then this object will be 

1497 converted to the appropriate string format for OTRS API. 

1498 

1499 """ 

1500 if not self.session_id_store.value: 

1501 raise SessionNotCreated("Call session_create() or " 

1502 "session_restore_or_create() first") 

1503 self.operation = "TicketSearch" 

1504 payload = { 

1505 self._session_key: self.session_id_store.value, 

1506 } 

1507 

1508 if dynamic_fields: 

1509 if isinstance(dynamic_fields, DynamicField): 

1510 payload.update(dynamic_fields.to_dct_search()) 

1511 else: 

1512 for df in dynamic_fields: 

1513 payload.update(df.to_dct_search()) 

1514 

1515 if kwargs is not None: 

1516 for key, value in kwargs.items(): 

1517 if isinstance(value, datetime.datetime): 

1518 value = value.strftime("%Y-%m-%d %H:%M:%S") 

1519 payload.update({key: value}) 

1520 

1521 if not self._parse_and_validate_response(self._send_request(payload)): 

1522 return False 

1523 else: 

1524 return self.result 

1525 

1526 def ticket_search_full_text(self, pattern): 

1527 """Wrapper for search ticket for full text search 

1528 

1529 Args: 

1530 pattern (str): Search pattern (a '%' will be added to front and end automatically) 

1531 

1532 Returns: 

1533 **list** or **False**: The search result (as list) if successful, 

1534 otherwise **False**. 

1535 

1536 """ 

1537 self.operation = "TicketSearch" 

1538 pattern_wildcard = f"%{pattern}%" 

1539 

1540 if self.use_legacy_sessions: 

1541 return self.ticket_search(FullTextIndex="1", 

1542 ContentSearch="OR", 

1543 Subject=pattern_wildcard, 

1544 Body=pattern_wildcard) 

1545 else: 

1546 return self.ticket_search(FullTextIndex="1", 

1547 ContentSearch="OR", 

1548 MIMEBase_Subject=pattern_wildcard, 

1549 MIMEBase_Body=pattern_wildcard) 

1550 

1551 """ 

1552 GenericInterface::Operation::Ticket::TicketUpdate 

1553 * ticket_update 

1554 * ticket_update_set_pending 

1555 """ 

1556 

1557 def ticket_update(self, 

1558 ticket_id, 

1559 article=None, 

1560 attachments=None, 

1561 dynamic_fields=None, 

1562 **kwargs): 

1563 """Update a Ticket 

1564 

1565 Args: 

1566 

1567 ticket_id (int): Ticket ID as integer value 

1568 article (Article): **optional** one *Article* that will be add to the ticket 

1569 attachments (list): list of one or more *Attachment* objects that will 

1570 be added to ticket. Also requires an *Article*! 

1571 dynamic_fields (list): *DynamicField* objects 

1572 **kwargs: any regular Ticket Fields (not for Dynamic Fields!) 

1573 

1574 Returns: 

1575 **dict** or **False**: A dict if successful, otherwise **False**. 

1576 """ 

1577 if not self.session_id_store.value: 

1578 raise SessionNotCreated("Call session_create() or " 

1579 "session_restore_or_create() first") 

1580 self.operation = "TicketUpdate" 

1581 

1582 payload = {self._session_key: self.session_id_store.value, "TicketID": ticket_id} 

1583 

1584 if article: 

1585 article.validate() 

1586 payload.update({"Article": article.to_dct()}) 

1587 

1588 if attachments: 

1589 if not article: 

1590 raise ArgumentMissingError("To create an attachment an article is needed!") 

1591 # noinspection PyTypeChecker 

1592 payload.update({"Attachment": [att.to_dct() for att in attachments]}) 

1593 

1594 if dynamic_fields: 

1595 # noinspection PyTypeChecker 

1596 payload.update({"DynamicField": [df.to_dct() for df in dynamic_fields]}) 

1597 

1598 if kwargs is not None and not kwargs == {}: 

1599 ticket_dct = {} 

1600 for key, value in kwargs.items(): 

1601 ticket_dct.update({key: value}) 

1602 payload.update({"Ticket": ticket_dct}) 

1603 

1604 if not self._parse_and_validate_response(self._send_request(payload, ticket_id)): 

1605 return False 

1606 

1607 return self.result_json 

1608 

1609 def ticket_update_set_pending(self, 

1610 ticket_id, 

1611 new_state="pending reminder", 

1612 pending_days=1, 

1613 pending_hours=0): 

1614 """ticket_update_set_state_pending 

1615 

1616 Args: 

1617 ticket_id (int): Ticket ID as integer value 

1618 new_state (str): defaults to "pending reminder" 

1619 pending_days (int): defaults to 1 

1620 pending_hours (int): defaults to 0 

1621 

1622 Returns: 

1623 **dict** or **False**: A dict if successful, otherwise **False**. 

1624 

1625 .. note:: 

1626 Operates in UTC 

1627 """ 

1628 datetime_now = datetime.datetime.utcnow() 

1629 pending_till = datetime_now + datetime.timedelta(days=pending_days, hours=pending_hours) 

1630 

1631 pt = Ticket.datetime_to_pending_time_text(datetime_object=pending_till) 

1632 

1633 return self.ticket_update(ticket_id, State=new_state, PendingTime=pt) 

1634 

1635 """ 

1636 GenericInterface::Operation::Link::LinkAdd 

1637 * link_add 

1638 """ 

1639 

1640 def link_add(self, 

1641 src_object_id, 

1642 dst_object_id, 

1643 src_object_type="Ticket", 

1644 dst_object_type="Ticket", 

1645 link_type="Normal", 

1646 state="Valid"): 

1647 """link_add 

1648 

1649 Args: 

1650 src_object_id (int): Integer value of source object ID 

1651 dst_object_id (int): Integer value of destination object ID 

1652 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"... 

1653 (*default: Ticket*) 

1654 dst_object_type (str): Object type of destination; e.g. "Ticket", "FAQ"... 

1655 (*default: Ticket*) 

1656 link_type (str): Type of the link: "Normal" or "ParentChild" (*default: Normal*) 

1657 state (str): State of the link (*default: Normal*) 

1658 

1659 Returns: 

1660 **True** or **False**: True if successful, otherwise **False**. 

1661 

1662 """ 

1663 if not self.session_id_store.value: 

1664 raise SessionNotCreated("Call session_create() or " 

1665 "session_restore_or_create() first") 

1666 self.operation = "LinkAdd" 

1667 

1668 payload = { 

1669 self._session_key: self.session_id_store.value, 

1670 "SourceObject": src_object_type, 

1671 "SourceKey": int(src_object_id), 

1672 "TargetObject": dst_object_type, 

1673 "TargetKey": int(dst_object_id), 

1674 "Type": link_type, 

1675 "State": state 

1676 } 

1677 

1678 return self._parse_and_validate_response(self._send_request(payload)) 

1679 

1680 """ 

1681 GenericInterface::Operation::Link::LinkDelete 

1682 * link_delete 

1683 """ 

1684 

1685 def link_delete(self, 

1686 src_object_id, 

1687 dst_object_id, 

1688 src_object_type="Ticket", 

1689 dst_object_type="Ticket", 

1690 link_type="Normal"): 

1691 """link_delete 

1692 

1693 Args: 

1694 src_object_id (int): Integer value of source object ID 

1695 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"... 

1696 (*default: Ticket*) 

1697 dst_object_id (int): Integer value of source object ID 

1698 dst_object_type (str): Object type of source; e.g. "Ticket", "FAQ"... 

1699 (*default: Ticket*) 

1700 link_type (str): Type of the link: "Normal" or "ParentChild" (*default: Normal*) 

1701 

1702 Returns: 

1703 **True** or **False**: True if successful, otherwise **False**. 

1704 

1705 """ 

1706 if not self.session_id_store.value: 

1707 raise SessionNotCreated("Call session_create() or " 

1708 "session_restore_or_create() first") 

1709 self.operation = "LinkDelete" 

1710 

1711 payload = { 

1712 self._session_key: self.session_id_store.value, 

1713 "Object1": src_object_type, 

1714 "Key1": int(src_object_id), 

1715 "Object2": dst_object_type, 

1716 "Key2": int(dst_object_id), 

1717 "Type": link_type 

1718 } 

1719 

1720 return self._parse_and_validate_response(self._send_request(payload)) 

1721 

1722 """ 

1723 GenericInterface::Operation::Link::LinkDeleteAll 

1724 * link_delete_all 

1725 """ 

1726 

1727 def link_delete_all(self, 

1728 object_id, 

1729 object_type="Ticket"): 

1730 """link_delete_all 

1731 

1732 Args: 

1733 object_id (int): Integer value of source object ID 

1734 object_type (str): Object type of source; e.g. "Ticket", "FAQ"... 

1735 (*default: Ticket*) 

1736 

1737 Returns: 

1738 **True** or **False**: True if successful, otherwise **False**. 

1739 

1740 """ 

1741 if not self.session_id_store.value: 

1742 raise SessionNotCreated("Call session_create() or " 

1743 "session_restore_or_create() first") 

1744 self.operation = "LinkDeleteAll" 

1745 

1746 payload = { 

1747 self._session_key: self.session_id_store.value, 

1748 "Object": object_type, 

1749 "Key": int(object_id) 

1750 } 

1751 

1752 return self._parse_and_validate_response(self._send_request(payload)) 

1753 

1754 """ 

1755 GenericInterface::Operation::Link::LinkList 

1756 * link_list 

1757 """ 

1758 

1759 def link_list(self, 

1760 src_object_id, 

1761 src_object_type="Ticket", 

1762 dst_object_type=None, 

1763 state="Valid", 

1764 link_type=None, 

1765 direction=None): 

1766 """link_list 

1767 

1768 Args: 

1769 src_object_id (int): Integer value of source object ID 

1770 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"... 

1771 (*default: Ticket*) 

1772 dst_object_type (str): Object type of destination; e.g. "Ticket", "FAQ"... 

1773 Optional restriction of the object where the links point to. (*default: Ticket*) 

1774 state (str): State of the link (*default: Valid*) 

1775 link_type (str): Type of the link: "Normal" or "ParentChild" (*default: Normal*) 

1776 direction (str): Optional restriction of the link direction ('Source' or 'Target'). 

1777 

1778 Returns: 

1779 **list** or **None**: List of found dict links if successful, if empty **None**. 

1780 

1781 """ 

1782 if not self.session_id_store.value: 

1783 raise SessionNotCreated("Call session_create() or " 

1784 "session_restore_or_create() first") 

1785 self.operation = "LinkList" 

1786 

1787 payload = { 

1788 self._session_key: self.session_id_store.value, 

1789 "Object": src_object_type, 

1790 "Key": int(src_object_id), 

1791 "State": state 

1792 } 

1793 

1794 if dst_object_type: 

1795 payload.update({"Object2": dst_object_type}) 

1796 

1797 if link_type: 

1798 payload.update({"Type": link_type}) 

1799 

1800 if direction: 

1801 payload.update({"Direction": direction}) 

1802 

1803 result = None 

1804 if self._parse_and_validate_response(self._send_request(payload)): 

1805 result = self.result 

1806 return result 

1807 

1808 """ 

1809 GenericInterface::Operation::Link::PossibleLinkList 

1810 * link_possible_link_list 

1811 """ 

1812 

1813 def link_possible_link_list(self): 

1814 """link_possible_link_list 

1815 

1816 Returns: 

1817 **List** or **False**: List if successful, otherwise **False**. 

1818 

1819 """ 

1820 if not self.session_id_store.value: 

1821 raise SessionNotCreated("Call session_create() or " 

1822 "session_restore_or_create() first") 

1823 self.operation = "PossibleLinkList" 

1824 

1825 payload = { 

1826 self._session_key: self.session_id_store.value, 

1827 } 

1828 

1829 if self._parse_and_validate_response(self._send_request(payload)): 

1830 return self.result 

1831 else: 

1832 return False 

1833 

1834 """ 

1835 GenericInterface::Operation::Link::PossibleObjectsList 

1836 * link_possible_objects_list 

1837 """ 

1838 

1839 def link_possible_objects_list(self, 

1840 object_type="Ticket"): 

1841 """link_possible_objects_list 

1842 

1843 Args: 

1844 object_type (str): Object type; e.g. "Ticket", "FAQ"... 

1845 (*default: Ticket*) 

1846 

1847 Returns: 

1848 **List** or **False**: List if successful, otherwise **False**. 

1849 

1850 """ 

1851 if not self.session_id_store.value: 

1852 raise SessionNotCreated("Call session_create() or " 

1853 "session_restore_or_create() first") 

1854 self.operation = "PossibleObjectsList" 

1855 

1856 payload = { 

1857 self._session_key: self.session_id_store.value, 

1858 "Object": object_type, 

1859 } 

1860 

1861 if self._parse_and_validate_response(self._send_request(payload)): 

1862 return self.result 

1863 else: 

1864 return False 

1865 

1866 """ 

1867 GenericInterface::Operation::Link::PossibleTypesList 

1868 * link_possible_types_list 

1869 """ 

1870 

1871 def link_possible_types_list(self, 

1872 src_object_type="Ticket", 

1873 dst_object_type="Ticket"): 

1874 """link_possible_types_list 

1875 

1876 Args: 

1877 src_object_type (str): Object type of source; e.g. "Ticket", "FAQ"... 

1878 (*default: Ticket*) 

1879 dst_object_type (str): Object type of destination; e.g. "Ticket", "FAQ"... 

1880 (*default: Ticket*) 

1881 

1882 Returns: 

1883 **List** or **False**: List if successful, otherwise **False**. 

1884 

1885 """ 

1886 if not self.session_id_store.value: 

1887 raise SessionNotCreated("Call session_create() or " 

1888 "session_restore_or_create() first") 

1889 self.operation = "PossibleTypesList" 

1890 

1891 payload = { 

1892 self._session_key: self.session_id_store.value, 

1893 "Object1": src_object_type, 

1894 "Object2": dst_object_type, 

1895 } 

1896 

1897 if self._parse_and_validate_response(self._send_request(payload)): 

1898 return self.result 

1899 else: 

1900 return False 

1901 

1902 def _build_url(self, data_id=None): 

1903 """build url for request 

1904 

1905 Args: 

1906 data_id (optional[int]) 

1907 

1908 Returns: 

1909 **str**: The complete URL where the request will be send to. 

1910 

1911 """ 

1912 route = self.ws_config[self.operation]["Route"] 

1913 

1914 if ":" in route: 

1915 route_split = route.split(":") 

1916 route = route_split[0] 

1917 route_arg = route_split[1] 

1918 

1919 if route_arg == "TicketID": 

1920 if not data_id: 

1921 raise ValueError("TicketID is None but Route requires " 

1922 f"TicketID: {route}") 

1923 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_ticket}{route}{data_id}" 

1924 elif route_arg == "SessionID": 

1925 if not data_id: 

1926 raise ValueError("SessionID is None but Route requires " 

1927 f"SessionID: {route}") 

1928 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_ticket}{route}{data_id}" 

1929 else: 

1930 if route in self.routes_ticket: 

1931 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_ticket}{route}" 

1932 elif route in self.routes_link: 

1933 self._url = f"{self.baseurl}{self.webservice_path}{self.ws_link}{route}" 

1934 

1935 return self._url 

1936 

1937 def _send_request(self, payload=None, data_id=None): 

1938 """send the API request using the *requests.request* method 

1939 

1940 Args: 

1941 payload (dict) 

1942 data_id (optional[dict]) 

1943 

1944 Raises: 

1945 OTRSHTTPError: 

1946 

1947 Returns: 

1948 **requests.Response**: Response received after sending the request. 

1949 

1950 .. note:: 

1951 Supported HTTP Methods: DELETE, GET, HEAD, PATCH, POST, PUT 

1952 """ 

1953 if not payload: 

1954 raise ArgumentMissingError("payload") 

1955 

1956 self._result_type = self.ws_config[self.operation]["Result"] 

1957 

1958 url = self._build_url(data_id) 

1959 

1960 http_method = self.ws_config[self.operation]["RequestMethod"] 

1961 

1962 if http_method not in ["DELETE", "GET", "HEAD", "PATCH", "POST", "PUT"]: 

1963 raise ValueError("invalid http_method") 

1964 

1965 headers = {} 

1966 

1967 if self.user_agent: 

1968 headers.update({"User-Agent": self.user_agent}) 

1969 

1970 if http_method == "GET": 

1971 

1972 # print("sending {0} to {1} as {2}".format(payload, url, http_method.upper())) 

1973 try: 

1974 response = requests.request("GET", 

1975 url, 

1976 headers=headers, 

1977 params=payload, 

1978 proxies=self.proxies, 

1979 verify=self.https_verify, 

1980 cert=self.client_auth_cert, 

1981 auth=self.auth, 

1982 timeout=self.request_timeout) 

1983 

1984 # store a copy of the request 

1985 self._request = response.request 

1986 

1987 # critical error: HTTP request resulted in an error! 

1988 except Exception as err: 

1989 # raise OTRSHTTPError("get http") 

1990 raise HTTPError("Failed to access OTRS. Check Hostname, Proxy, SSL Certificate!\n" 

1991 f"Error with http communication: {err}") 

1992 

1993 else: 

1994 

1995 headers.update({"Content-Type": "application/json"}) 

1996 

1997 json_payload = json.dumps(payload) 

1998 

1999 # print("sending {0} to {1} as {2}".format(payload, url, http_method.upper())) 

2000 try: 

2001 response = requests.request(http_method.upper(), 

2002 url, 

2003 headers=headers, 

2004 data=json_payload, 

2005 proxies=self.proxies, 

2006 verify=self.https_verify, 

2007 cert=self.client_auth_cert, 

2008 auth=self.auth, 

2009 timeout=self.request_timeout) 

2010 

2011 # store a copy of the request 

2012 self._request = response.request 

2013 

2014 # critical error: HTTP request resulted in an error! 

2015 except Exception as err: 

2016 # raise OTRSHTTPError("get http") 

2017 raise HTTPError("Failed to access OTRS. Check Hostname, Proxy, SSL Certificate!\n" 

2018 f"Error with http communication: {err}") 

2019 

2020 if not response.status_code == 200: 

2021 raise HTTPError("Received HTTP Error. Check Hostname and WebServiceName.\n" 

2022 f"HTTP Status Code: {response.status_code}\n" 

2023 f"HTTP Message: {response.content}") 

2024 return response 

2025 

2026 def _parse_and_validate_response(self, response): 

2027 """_parse_and_validate_response 

2028 

2029 Args: 

2030 response (requests.Response): result of _send_request 

2031 

2032 Raises: 

2033 OTRSAPIError 

2034 NotImplementedError 

2035 ResponseParseError 

2036 

2037 Returns: 

2038 **bool**: **True** if successful 

2039 

2040 """ 

2041 

2042 if not isinstance(response, requests.models.Response): 

2043 raise ValueError("requests.Response object expected!") 

2044 

2045 if self.operation not in self.ws_config.keys(): 

2046 raise ValueError("invalid operation") 

2047 

2048 # clear data from Client 

2049 self.result = None 

2050 self._result_error = False 

2051 

2052 # get and set new data 

2053 self.result_json = response.json() 

2054 self._result_status_code = response.status_code 

2055 self._result_content = response.content 

2056 

2057 # handle TicketSearch operation first. special: empty search result has no "TicketID" 

2058 if self.operation == "TicketSearch": 

2059 if not self.result_json: 

2060 self.result = [] 

2061 return True 

2062 if self.result_json.get(self._result_type, None): 

2063 self.result = self.result_json['TicketID'] 

2064 return True 

2065 

2066 # now handle SessionGet operation 

2067 if self.operation in ["SessionGet"]: 

2068 # For SessionGet the "Result" that is returned is different when legacy session 

2069 # are used. 

2070 # SessionData was default in OTRS <= 7 / PyOTRS used it as default from v0.1. 

2071 # AccessTokenData was introduced in OTRS 8 - for CustomerUser already in 7 (?!). 

2072 # Unless the dict was modified - use the hardcoded defaults accordingly. 

2073 if self._result_type not in ["AccessTokenData", "SessionData"]: 

2074 session_result_type = self._result_type 

2075 else: 

2076 if self.use_legacy_sessions: 

2077 session_result_type = "SessionData" 

2078 else: 

2079 session_result_type = "AccessTokenData" 

2080 

2081 _session_data = self.result_json.get(session_result_type, None) 

2082 if _session_data: # received SessionData -> Session ID is valid 

2083 self._result_error = False 

2084 self.result = self.result_json[session_result_type] 

2085 return True 

2086 elif self.result_json["Error"]["ErrorCode"] == "SessionGet.SessionInvalid": 

2087 return False 

2088 else: 

2089 raise APIError("Failed to access OTRS API.\n" 

2090 "OTRS Error Code: {}\nOTRS Error Message: {}" 

2091 "".format(self.result_json["Error"]["ErrorCode"], 

2092 self.result_json["Error"]["ErrorMessage"])) 

2093 

2094 # handle Link operations; Add, Delete, DeleteAll return: {"Success":1} 

2095 if self.operation in ["LinkAdd", "LinkDelete", "LinkDeleteAll"]: 

2096 if self.result_json.get("Success", None) == 1: 

2097 return True 

2098 

2099 # LinkList result can be empty 

2100 if self.operation in "LinkList": 

2101 _link_list = self.result_json.get("LinkList", None) 

2102 if not _link_list: 

2103 self.result = None 

2104 return True 

2105 else: 

2106 self.result = _link_list 

2107 return True 

2108 

2109 # now handle other operations 

2110 if self.result_json.get(self._result_type, None): 

2111 self._result_error = False 

2112 self.result = self.result_json[self._result_type] 

2113 elif self.result_json.get("Error", None): 

2114 self._result_error = True 

2115 else: 

2116 self._result_error = True 

2117 # critical error: Unknown response from OTRS API - FAIL NOW! 

2118 raise ResponseParseError("Unknown key in response JSON DICT!") 

2119 

2120 # report error 

2121 if self._result_error: 

2122 raise APIError("Failed to access OTRS API. Check Username and Password! " 

2123 "Session ID expired?! Does Ticket exist?\n" 

2124 "OTRS Error Code: {}\nOTRS Error Message: {}" 

2125 "".format(self.result_json["Error"]["ErrorCode"], 

2126 self.result_json["Error"]["ErrorMessage"])) 

2127 

2128 # for operation TicketGet: parse result list into Ticket object list 

2129 if self.operation == "TicketGet" or self.operation == "TicketGetList": 

2130 self.result = [Ticket(item) for item in self.result_json['Ticket']] 

2131 

2132 return True 

2133 

2134# EOF