| Home | Trees | Indices | Help |
|
|---|
|
|
1 """GNUmed measurements related business objects."""
2
3 # FIXME: use UCUM from Regenstrief Institute
4 #============================================================
5 __author__ = "K.Hilbert <Karsten.Hilbert@gmx.net>"
6 __license__ = "GPL"
7
8
9 import types
10 import sys
11 import logging
12 import codecs
13 import decimal
14
15
16 if __name__ == '__main__':
17 sys.path.insert(0, '../../')
18
19 from Gnumed.pycommon import gmDateTime
20 if __name__ == '__main__':
21 from Gnumed.pycommon import gmLog2
22 from Gnumed.pycommon import gmI18N
23 gmDateTime.init()
24 from Gnumed.pycommon import gmExceptions
25 from Gnumed.pycommon import gmBusinessDBObject
26 from Gnumed.pycommon import gmPG2
27 from Gnumed.pycommon import gmTools
28 from Gnumed.pycommon import gmDispatcher
29 from Gnumed.pycommon import gmHooks
30 from Gnumed.business import gmOrganization
31 from Gnumed.business import gmCoding
32
33
34 _log = logging.getLogger('gm.lab')
35
36 #============================================================
38 """Always relates to the active patient."""
39 gmHooks.run_hook_script(hook = u'after_test_result_modified')
40
41 gmDispatcher.connect(_on_test_result_modified, u'clin.test_result_mod_db')
42
43 #============================================================
45 """Represents one test org/lab."""
46 _cmd_fetch_payload = u"""SELECT * FROM clin.v_test_orgs WHERE pk_test_org = %s"""
47 _cmds_store_payload = [
48 u"""UPDATE clin.test_org SET
49 fk_org_unit = %(pk_org_unit)s,
50 contact = gm.nullify_empty_string(%(test_org_contact)s),
51 comment = gm.nullify_empty_string(%(comment)s)
52 WHERE
53 pk = %(pk_test_org)s
54 AND
55 xmin = %(xmin_test_org)s
56 RETURNING
57 xmin AS xmin_test_org
58 """
59 ]
60 _updatable_fields = [
61 u'pk_org_unit',
62 u'test_org_contact',
63 u'comment'
64 ]
65 #------------------------------------------------------------
67
68 if name is None:
69 name = u'unassigned lab'
70
71 # get org unit
72 if pk_org_unit is None:
73 org = gmOrganization.org_exists(organization = name)
74 if org is None:
75 org = gmOrganization.create_org (
76 organization = name,
77 category = u'Laboratory'
78 )
79 org_unit = gmOrganization.create_org_unit (
80 pk_organization = org['pk_org'],
81 unit = name
82 )
83 pk_org_unit = org_unit['pk_org_unit']
84
85 # test org exists ?
86 args = {'pk_unit': pk_org_unit}
87 cmd = u'SELECT pk_test_org FROM clin.v_test_orgs WHERE pk_org_unit = %(pk_unit)s'
88 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
89
90 if len(rows) == 0:
91 cmd = u'INSERT INTO clin.test_org (fk_org_unit) VALUES (%(pk_unit)s) RETURNING pk'
92 rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
93
94 test_org = cTestOrg(aPK_obj = rows[0][0])
95 if comment is not None:
96 comment = comment.strip()
97 test_org['comment'] = comment
98 test_org.save()
99
100 return test_org
101 #------------------------------------------------------------
103 args = {'pk': test_org}
104 cmd = u"""
105 DELETE FROM clin.test_org
106 WHERE
107 pk = %(pk)s
108 AND
109 NOT EXISTS (SELECT 1 FROM clin.lab_request WHERE fk_test_org = %(pk)s LIMIT 1)
110 AND
111 NOT EXISTS (SELECT 1 FROM clin.test_type WHERE fk_test_org = %(pk)s LIMIT 1)
112 """
113 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
114 #------------------------------------------------------------
116 cmd = u'SELECT * FROM clin.v_test_orgs ORDER BY %s' % order_by
117 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
118 return [ cTestOrg(row = {'pk_field': 'pk_test_org', 'data': r, 'idx': idx}) for r in rows ]
119
120 #============================================================
121 # test panels / profiles
122 #------------------------------------------------------------
123 _SQL_get_test_panels = u"SELECT * FROM clin.v_test_panels WHERE %s"
124
126 """Represents a grouping/listing of tests into a panel."""
127
128 _cmd_fetch_payload = _SQL_get_test_panels % u"pk_test_panel = %s"
129 _cmds_store_payload = [
130 u"""
131 UPDATE clin.test_panel SET
132 description = gm.nullify_empty_string(%(description)s),
133 comment = gm.nullify_empty_string(%(comment)s),
134 fk_test_types = %(pk_test_types)s
135 WHERE
136 pk = %(pk_test_panel)s
137 AND
138 xmin = %(xmin_test_panel)s
139 RETURNING
140 xmin AS xmin_test_panel
141 """
142 ]
143 _updatable_fields = [
144 u'description',
145 u'comment',
146 u'pk_test_types'
147 ]
148 #--------------------------------------------------------
150 """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
151 cmd = u"INSERT INTO clin.lnk_code2tst_pnl (fk_item, fk_generic_code) values (%(tp)s, %(code)s)"
152 args = {
153 'tp': self._payload[self._idx['pk_test_panel']],
154 'code': pk_code
155 }
156 rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
157 return True
158 #--------------------------------------------------------
160 """<pk_code> must be a value from ref.coding_system_root.pk_coding_system (clin.lnk_code2item_root.fk_generic_code)"""
161 cmd = u"DELETE FROM clin.lnk_code2tst_pnl WHERE fk_item = %(tp)s AND fk_generic_code = %(code)s"
162 args = {
163 'tp': self._payload[self._idx['pk_test_panel']],
164 'code': pk_code
165 }
166 rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
167 return True
168 #--------------------------------------------------------
169 # properties
170 #--------------------------------------------------------
172 if self._payload[self._idx['pk_test_types']] is None:
173 return None
174
175 rows, idx = gmPG2.run_ro_queries (
176 queries = [{
177 'cmd': _SQL_get_test_types % u'pk_test_type IN %(pks)s ORDER BY unified_abbrev',
178 'args': {'pks': tuple(self._payload[self._idx['pk_test_types']])}
179 }],
180 get_col_idx = True
181 )
182 return [ cMeasurementType(row = {'data': r, 'idx': idx, 'pk_field': 'pk_test_type'}) for r in rows ]
183
184 test_types = property(_get_test_types, lambda x:x)
185 #--------------------------------------------------------
187 if len(self._payload[self._idx['pk_generic_codes']]) == 0:
188 return []
189
190 cmd = gmCoding._SQL_get_generic_linked_codes % u'pk_generic_code IN %(pks)s'
191 args = {'pks': tuple(self._payload[self._idx['pk_generic_codes']])}
192 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
193 return [ gmCoding.cGenericLinkedCode(row = {'data': r, 'idx': idx, 'pk_field': 'pk_lnk_code2item'}) for r in rows ]
194
196 queries = []
197 # remove all codes
198 if len(self._payload[self._idx['pk_generic_codes']]) > 0:
199 queries.append ({
200 'cmd': u'DELETE FROM clin.lnk_code2tst_pnl WHERE fk_item = %(tp)s AND fk_generic_code IN %(codes)s',
201 'args': {
202 'tp': self._payload[self._idx['pk_test_panel']],
203 'codes': tuple(self._payload[self._idx['pk_generic_codes']])
204 }
205 })
206 # add new codes
207 for pk_code in pk_codes:
208 queries.append ({
209 'cmd': u'INSERT INTO clin.lnk_code2test_panel (fk_item, fk_generic_code) VALUES (%(tp)s, %(pk_code)s)',
210 'args': {
211 'tp': self._payload[self._idx['pk_test_panel']],
212 'pk_code': pk_code
213 }
214 })
215 if len(queries) == 0:
216 return
217 # run it all in one transaction
218 rows, idx = gmPG2.run_rw_queries(queries = queries)
219 return
220
221 generic_codes = property(_get_generic_codes, _set_generic_codes)
222 #--------------------------------------------------------
224 txt = _('Test panel "%s" [#%s]\n') % (
225 self._payload[self._idx['description']],
226 self._payload[self._idx['pk_test_panel']]
227 )
228
229 if self._payload[self._idx['comment']] is not None:
230 txt += u'\n'
231 txt += gmTools.wrap (
232 text = self._payload[self._idx['comment']],
233 width = 50,
234 initial_indent = u' ',
235 subsequent_indent = u' '
236 )
237 txt += u'\n'
238
239 tts = self.test_types
240 if tts is not None:
241 txt += u'\n'
242 txt += _('Included test types:\n')
243 for tt in tts:
244 txt += u' %s: %s\n' % (
245 tt['abbrev'],
246 tt['name']
247 )
248
249 codes = self.generic_codes
250 if len(codes) > 0:
251 txt += u'\n'
252 for c in codes:
253 txt += u'%s %s: %s (%s - %s)\n' % (
254 (u' ' * left_margin),
255 c['code'],
256 c['term'],
257 c['name_short'],
258 c['version']
259 )
260
261 return txt
262 #------------------------------------------------------------
264 if order_by is None:
265 order_by = u'true'
266 else:
267 order_by = u'true ORDER BY %s' % order_by
268
269 cmd = _SQL_get_test_panels % order_by
270 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
271 return [ cTestPanel(row = {'data': r, 'idx': idx, 'pk_field': 'pk_test_panel'}) for r in rows ]
272 #------------------------------------------------------------
274
275 args = {u'desc': description.strip()}
276 cmd = u"""
277 INSERT INTO clin.test_panel (description)
278 VALUES (gm.nullify_empty_string(%(desc)s))
279 RETURNING pk
280 """
281 rows, idx = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True, get_col_idx = False)
282
283 return cTestPanel(aPK_obj = rows[0]['pk'])
284 #------------------------------------------------------------
286 args = {'pk': pk}
287 cmd = u"DELETE FROM clin.test_panel WHERE pk = %(pk)s"
288 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
289 return True
290
291 #============================================================
293 """Represents one meta test type under which actual test types can be aggregated."""
294
295 _cmd_fetch_payload = u"""select * from clin.meta_test_type where pk = %s"""
296
297 _cmds_store_payload = []
298
299 _updatable_fields = []
300 #--------------------------------------------------------
302 txt = _('Meta (%s=aggregate) test type [#%s]\n\n') % (gmTools.u_sum, self._payload[self._idx['pk']])
303 txt += _(' Name: %s (%s)\n') % (
304 self._payload[self._idx['abbrev']],
305 self._payload[self._idx['name']]
306 )
307 if self._payload[self._idx['loinc']] is not None:
308 txt += u' LOINC: %s (%s)\n' % self._payload[self._idx['loinc']]
309 if self._payload[self._idx['loinc']] is not None:
310 txt += _(' Comment: %s\n') % self._payload[self._idx['comment']]
311 if with_tests:
312 ttypes = self.included_test_types
313 if len(ttypes) > 0:
314 txt += _(' Aggregates the following test types:\n')
315 for ttype in ttypes:
316 txt += u' %s (%s)%s%s [#%s]\n' % (
317 ttype['abbrev'],
318 ttype['name'],
319 gmTools.coalesce(ttype['conversion_unit'], u'', ', %s'),
320 gmTools.coalesce(ttype['loinc'], u'', u', LOINC: %s'),
321 ttype['pk_test_type']
322 )
323 if patient is not None:
324 txt += u'\n'
325 most_recent = self.get_most_recent_result(patient = patient)
326 if most_recent is not None:
327 txt += _(' Most recent (%s): %s%s%s') % (
328 most_recent['clin_when'].strftime('%Y %b %d'),
329 most_recent['unified_val'],
330 gmTools.coalesce(most_recent['val_unit'], u'', u' %s'),
331 gmTools.coalesce(most_recent['abnormality_indicator'], u'', u' (%s)')
332 )
333 oldest = self.get_oldest_result(patient = patient)
334 if oldest is not None:
335 txt += u'\n'
336 txt += _(' Oldest (%s): %s%s%s') % (
337 oldest['clin_when'].strftime('%Y %b %d'),
338 oldest['unified_val'],
339 gmTools.coalesce(oldest['val_unit'], u'', u' %s'),
340 gmTools.coalesce(oldest['abnormality_indicator'], u'', u' (%s)')
341 )
342 return txt
343
344 #--------------------------------------------------------
346 args = {
347 'pat': patient,
348 'mttyp': self._payload[self._idx['pk']]
349 }
350 cmd = u"""
351 SELECT * FROM clin.v_test_results
352 WHERE
353 pk_patient = %(pat)s
354 AND
355 pk_meta_test_type = %(mttyp)s
356 ORDER BY clin_when DESC
357 LIMIT 1"""
358 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
359 if len(rows) == 0:
360 return None
361
362 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': rows[0]})
363
364 #--------------------------------------------------------
366 args = {
367 'pat': patient,
368 'mttyp': self._payload[self._idx['pk']]
369 }
370 cmd = u"""
371 SELECT * FROM clin.v_test_results
372 WHERE
373 pk_patient = %(pat)s
374 AND
375 pk_meta_test_type = %(mttyp)s
376 ORDER BY clin_when
377 LIMIT 1"""
378 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
379 if len(rows) == 0:
380 return None
381
382 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': rows[0]})
383
384 #--------------------------------------------------------
385 # properties
386 #--------------------------------------------------------
388 cmd = _SQL_get_test_types % u'pk_meta_test_type = %(pk_meta)s'
389 args = {u'pk_meta': self._payload[self._idx['pk']]}
390 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
391 return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r, 'idx': idx}) for r in rows ]
392
393 included_test_types = property(_get_included_test_types, lambda x:x)
394
395 #------------------------------------------------------------
397 cmd = u'delete from clin.meta_test_type where pk = %(pk)s'
398 args = {'pk': meta_type}
399 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
400
401 #------------------------------------------------------------
403 cmd = u'select * from clin.meta_test_type'
404 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
405 return [ cMetaTestType(row = {'pk_field': 'pk', 'data': r, 'idx': idx}) for r in rows ]
406
407 #============================================================
408 _SQL_get_test_types = u"SELECT * FROM clin.v_test_types WHERE %s"
409
411 """Represents one test result type."""
412
413 _cmd_fetch_payload = _SQL_get_test_types % u"pk_test_type = %s"
414
415 _cmds_store_payload = [
416 u"""UPDATE clin.test_type SET
417 abbrev = gm.nullify_empty_string(%(abbrev)s),
418 name = gm.nullify_empty_string(%(name)s),
419 loinc = gm.nullify_empty_string(%(loinc)s),
420 comment = gm.nullify_empty_string(%(comment_type)s),
421 conversion_unit = gm.nullify_empty_string(%(conversion_unit)s),
422 fk_test_org = %(pk_test_org)s,
423 fk_meta_test_type = %(pk_meta_test_type)s
424 WHERE
425 pk = %(pk_test_type)s
426 AND
427 xmin = %(xmin_test_type)s
428 RETURNING
429 xmin AS xmin_test_type"""
430 ]
431
432 _updatable_fields = [
433 'abbrev',
434 'name',
435 'loinc',
436 'comment_type',
437 'conversion_unit',
438 'pk_test_org',
439 'pk_meta_test_type'
440 ]
441 #--------------------------------------------------------
442 # properties
443 #--------------------------------------------------------
445 cmd = u'SELECT EXISTS(SELECT 1 FROM clin.test_result WHERE fk_type = %(pk_type)s)'
446 args = {'pk_type': self._payload[self._idx['pk_test_type']]}
447 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
448 return rows[0][0]
449
450 in_use = property(_get_in_use, lambda x:x)
451 #--------------------------------------------------------
453 results = get_most_recent_results (
454 test_type = self._payload[self._idx['pk_test_type']],
455 loinc = None,
456 no_of_results = no_of_results,
457 patient = patient
458 )
459 if results is None:
460 if self._payload[self._idx['loinc']] is not None:
461 results = get_most_recent_results (
462 test_type = None,
463 loinc = self._payload[self._idx['loinc']],
464 no_of_results = no_of_results,
465 patient = patient
466 )
467 return results
468
469 #--------------------------------------------------------
471 result = get_oldest_result (
472 test_type = self._payload[self._idx['pk_test_type']],
473 loinc = None,
474 patient = patient
475 )
476 if result is None:
477 if self._payload[self._idx['loinc']] is not None:
478 result = get_oldest_result (
479 test_type = None,
480 loinc = self._payload[self._idx['loinc']],
481 patient = patient
482 )
483 return result
484
485 #--------------------------------------------------------
487 if self._payload[self._idx['pk_test_panels']] is None:
488 return None
489
490 return [ cTestPanel(aPK_obj = pk) for pk in self._payload[self._idx['pk_test_panels']] ]
491
492 test_panels = property(_get_test_panels, lambda x:x)
493
494 #--------------------------------------------------------
496 if real_one_only is False:
497 return cMetaTestType(aPK_obj = self._payload[self._idx['pk_meta_test_type']])
498 if self._payload[self._idx['is_fake_meta_type']]:
499 return None
500 return cMetaTestType(aPK_obj = self._payload[self._idx['pk_meta_test_type']])
501
502 meta_test_type = property(get_meta_test_type, lambda x:x)
503 #--------------------------------------------------------
505 """Returns the closest test result which does have normal range information.
506
507 - needs <unit>
508 - if <timestamp> is None it will assume now() and thus return the most recent
509 """
510 if timestamp is None:
511 timestamp = gmDateTime.pydt_now_here()
512 cmd = u"""
513 SELECT * FROM clin.v_test_results
514 WHERE
515 pk_test_type = %(pk_type)s
516 AND
517 val_unit = %(unit)s
518 AND
519 (
520 (val_normal_min IS NOT NULL)
521 OR
522 (val_normal_max IS NOT NULL)
523 OR
524 (val_normal_range IS NOT NULL)
525 )
526 ORDER BY
527 CASE
528 WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s
529 ELSE %(clin_when)s - clin_when
530 END
531 LIMIT 1"""
532 args = {
533 u'pk_type': self._payload[self._idx['pk_test_type']],
534 u'unit': unit,
535 u'clin_when': timestamp
536 }
537 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
538 if len(rows) == 0:
539 return None
540 r = rows[0]
541 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r})
542
543 #--------------------------------------------------------
545 """Returns the closest test result which does have target range information.
546
547 - needs <unit>
548 - needs <patient> (as target will be per-patient)
549 - if <timestamp> is None it will assume now() and thus return the most recent
550 """
551 if timestamp is None:
552 timestamp = gmDateTime.pydt_now_here()
553 cmd = u"""
554 SELECT * FROM clin.v_test_results
555 WHERE
556 pk_test_type = %(pk_type)s
557 AND
558 val_unit = %(unit)s
559 AND
560 pk_patient = %(pat)s
561 AND
562 (
563 (val_target_min IS NOT NULL)
564 OR
565 (val_target_max IS NOT NULL)
566 OR
567 (val_target_range IS NOT NULL)
568 )
569 ORDER BY
570 CASE
571 WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s
572 ELSE %(clin_when)s - clin_when
573 END
574 LIMIT 1"""
575 args = {
576 u'pk_type': self._payload[self._idx['pk_test_type']],
577 u'unit': unit,
578 u'pat': patient,
579 u'clin_when': timestamp
580 }
581 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
582 if len(rows) == 0:
583 return None
584 r = rows[0]
585 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r})
586
587 #--------------------------------------------------------
589 """Returns the unit of the closest test result.
590
591 - if <timestamp> is None it will assume now() and thus return the most recent
592 """
593 if timestamp is None:
594 timestamp = gmDateTime.pydt_now_here()
595 cmd = u"""
596 SELECT val_unit FROM clin.v_test_results
597 WHERE
598 pk_test_type = %(pk_type)s
599 AND
600 val_unit IS NOT NULL
601 ORDER BY
602 CASE
603 WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s
604 ELSE %(clin_when)s - clin_when
605 END
606 LIMIT 1"""
607 args = {
608 u'pk_type': self._payload[self._idx['pk_test_type']],
609 u'clin_when': timestamp
610 }
611 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
612 if len(rows) == 0:
613 return None
614 return rows[0]['val_unit']
615
616 temporally_closest_unit = property(get_temporally_closest_unit, lambda x:x)
617
618 #--------------------------------------------------------
620 tt = u''
621 tt += _('Test type "%s" (%s) [#%s]\n') % (
622 self._payload[self._idx['name']],
623 self._payload[self._idx['abbrev']],
624 self._payload[self._idx['pk_test_type']]
625 )
626 tt += u'\n'
627 tt += gmTools.coalesce(self._payload[self._idx['loinc']], u'', u' LOINC: %s\n')
628 tt += gmTools.coalesce(self._payload[self._idx['conversion_unit']], u'', _(' Conversion unit: %s\n'))
629 tt += gmTools.coalesce(self._payload[self._idx['comment_type']], u'', _(' Comment: %s\n'))
630
631 tt += u'\n'
632 tt += _('Lab details:\n')
633 tt += _(' Name: %s\n') % self._payload[self._idx['name_org']]
634 tt += gmTools.coalesce(self._payload[self._idx['contact_org']], u'', _(' Contact: %s\n'))
635 tt += gmTools.coalesce(self._payload[self._idx['comment_org']], u'', _(' Comment: %s\n'))
636
637 if self._payload[self._idx['is_fake_meta_type']] is False:
638 tt += u'\n'
639 tt += _('Aggregated under meta type:\n')
640 tt += _(' Name: %s - %s [#%s]\n') % (
641 self._payload[self._idx['abbrev_meta']],
642 self._payload[self._idx['name_meta']],
643 self._payload[self._idx['pk_meta_test_type']]
644 )
645 tt += gmTools.coalesce(self._payload[self._idx['loinc_meta']], u'', u' LOINC: %s\n')
646 tt += gmTools.coalesce(self._payload[self._idx['comment_meta']], u'', _(' Comment: %s\n'))
647
648 panels = self.test_panels
649 if panels is not None:
650 tt += u'\n'
651 tt += _('Listed in test panels:\n')
652 for panel in panels:
653 tt += _(' Panel "%s" [#%s]\n') % (
654 panel['description'],
655 panel['pk_test_panel']
656 )
657
658 if patient is not None:
659 tt += u'\n'
660 result = self.get_most_recent_results(patient = patient, no_of_results = 1)
661 if result is not None:
662 tt += _(' Most recent (%s): %s%s%s') % (
663 result['clin_when'].strftime('%Y-%m-%d'),
664 result['unified_val'],
665 gmTools.coalesce(result['val_unit'], u'', u' %s'),
666 gmTools.coalesce(result['abnormality_indicator'], u'', u' (%s)')
667 )
668 result = self.get_oldest_result(patient = patient)
669 if result is not None:
670 tt += u'\n'
671 tt += _(' Oldest (%s): %s%s%s') % (
672 result['clin_when'].strftime('%Y-%m-%d'),
673 result['unified_val'],
674 gmTools.coalesce(result['val_unit'], u'', u' %s'),
675 gmTools.coalesce(result['abnormality_indicator'], u'', u' (%s)')
676 )
677
678 return tt
679
680 #------------------------------------------------------------
682 cmd = u'select * from clin.v_test_types %s' % gmTools.coalesce(order_by, u'', u'order by %s')
683 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}], get_col_idx = True)
684 return [ cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': r, 'idx': idx}) for r in rows ]
685
686 #------------------------------------------------------------
688
689 if (abbrev is None) and (name is None):
690 raise ValueError('must have <abbrev> and/or <name> set')
691
692 where_snippets = []
693
694 if lab is None:
695 where_snippets.append('pk_test_org IS NULL')
696 else:
697 try:
698 int(lab)
699 where_snippets.append('pk_test_org = %(lab)s')
700 except (TypeError, ValueError):
701 where_snippets.append('pk_test_org = (SELECT pk_test_org FROM clin.v_test_orgs WHERE unit = %(lab)s)')
702
703 if abbrev is not None:
704 where_snippets.append('abbrev = %(abbrev)s')
705
706 if name is not None:
707 where_snippets.append('name = %(name)s')
708
709 where_clause = u' and '.join(where_snippets)
710 cmd = u"select * from clin.v_test_types where %s" % where_clause
711 args = {'lab': lab, 'abbrev': abbrev, 'name': name}
712
713 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
714
715 if len(rows) == 0:
716 return None
717
718 tt = cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': rows[0], 'idx': idx})
719 return tt
720
721 #------------------------------------------------------------
723 cmd = u'delete from clin.test_type where pk = %(pk)s'
724 args = {'pk': measurement_type}
725 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
726
727 #------------------------------------------------------------
729 """Create or get test type."""
730
731 ttype = find_measurement_type(lab = lab, abbrev = abbrev, name = name)
732 # found ?
733 if ttype is not None:
734 return ttype
735
736 _log.debug('creating test type [%s:%s:%s:%s]', lab, abbrev, name, unit)
737
738 # not found, so create it
739 if unit is None:
740 _log.error('need <unit> to create test type: %s:%s:%s:%s' % (lab, abbrev, name, unit))
741 raise ValueError('need <unit> to create test type')
742
743 # make query
744 cols = []
745 val_snippets = []
746 vals = {}
747
748 # lab
749 if lab is None:
750 lab = create_test_org()['pk_test_org']
751
752 cols.append('fk_test_org')
753 try:
754 vals['lab'] = int(lab)
755 val_snippets.append('%(lab)s')
756 except:
757 vals['lab'] = lab
758 val_snippets.append('(SELECT pk_test_org FROM clin.v_test_orgs WHERE unit = %(lab)s)')
759
760 # code
761 cols.append('abbrev')
762 val_snippets.append('%(abbrev)s')
763 vals['abbrev'] = abbrev
764
765 # unit
766 cols.append('conversion_unit')
767 val_snippets.append('%(unit)s')
768 vals['unit'] = unit
769
770 # name
771 if name is not None:
772 cols.append('name')
773 val_snippets.append('%(name)s')
774 vals['name'] = name
775
776 col_clause = u', '.join(cols)
777 val_clause = u', '.join(val_snippets)
778 queries = [
779 {'cmd': u'insert into clin.test_type (%s) values (%s)' % (col_clause, val_clause), 'args': vals},
780 {'cmd': u"select * from clin.v_test_types where pk_test_type = currval(pg_get_serial_sequence('clin.test_type', 'pk'))"}
781 ]
782 rows, idx = gmPG2.run_rw_queries(queries = queries, get_col_idx = True, return_data = True)
783 ttype = cMeasurementType(row = {'pk_field': 'pk_test_type', 'data': rows[0], 'idx': idx})
784
785 return ttype
786
787 #============================================================
789 """Represents one test result."""
790
791 _cmd_fetch_payload = u"select * from clin.v_test_results where pk_test_result = %s"
792
793 _cmds_store_payload = [
794 u"""update clin.test_result set
795 clin_when = %(clin_when)s,
796 narrative = nullif(trim(%(comment)s), ''),
797 val_num = %(val_num)s,
798 val_alpha = nullif(trim(%(val_alpha)s), ''),
799 val_unit = nullif(trim(%(val_unit)s), ''),
800 val_normal_min = %(val_normal_min)s,
801 val_normal_max = %(val_normal_max)s,
802 val_normal_range = nullif(trim(%(val_normal_range)s), ''),
803 val_target_min = %(val_target_min)s,
804 val_target_max = %(val_target_max)s,
805 val_target_range = nullif(trim(%(val_target_range)s), ''),
806 abnormality_indicator = nullif(trim(%(abnormality_indicator)s), ''),
807 norm_ref_group = nullif(trim(%(norm_ref_group)s), ''),
808 note_test_org = nullif(trim(%(note_test_org)s), ''),
809 material = nullif(trim(%(material)s), ''),
810 material_detail = nullif(trim(%(material_detail)s), ''),
811 fk_intended_reviewer = %(pk_intended_reviewer)s,
812 fk_encounter = %(pk_encounter)s,
813 fk_episode = %(pk_episode)s,
814 fk_type = %(pk_test_type)s,
815 fk_request = %(pk_request)s
816 where
817 pk = %(pk_test_result)s and
818 xmin = %(xmin_test_result)s""",
819 u"""select xmin_test_result from clin.v_test_results where pk_test_result = %(pk_test_result)s"""
820 ]
821
822 _updatable_fields = [
823 'clin_when',
824 'comment',
825 'val_num',
826 'val_alpha',
827 'val_unit',
828 'val_normal_min',
829 'val_normal_max',
830 'val_normal_range',
831 'val_target_min',
832 'val_target_max',
833 'val_target_range',
834 'abnormality_indicator',
835 'norm_ref_group',
836 'note_test_org',
837 'material',
838 'material_detail',
839 'pk_intended_reviewer',
840 'pk_encounter',
841 'pk_episode',
842 'pk_test_type',
843 'pk_request'
844 ]
845 #--------------------------------------------------------
846 # def format_old(self, with_review=True, with_comments=True, date_format='%Y-%m-%d %H:%M'):
847 #
848 # lines = []
849 #
850 # lines.append(u' %s %s (%s): %s %s%s' % (
851 # self._payload[self._idx['clin_when']].strftime(date_format),
852 # self._payload[self._idx['unified_abbrev']],
853 # self._payload[self._idx['unified_name']],
854 # self._payload[self._idx['unified_val']],
855 # self._payload[self._idx['val_unit']],
856 # gmTools.coalesce(self._payload[self._idx['abnormality_indicator']], u'', u' (%s)')
857 # ))
858 #
859 # if with_comments:
860 # if gmTools.coalesce(self._payload[self._idx['comment']], u'').strip() != u'':
861 # lines.append(_(' Doc: %s') % self._payload[self._idx['comment']].strip())
862 # if gmTools.coalesce(self._payload[self._idx['note_test_org']], u'').strip() != u'':
863 # lines.append(_(' MTA: %s') % self._payload[self._idx['note_test_org']].strip())
864 #
865 # if with_review:
866 # if self._payload[self._idx['reviewed']]:
867 # if self._payload[self._idx['is_clinically_relevant']]:
868 # lines.append(u' %s %s: %s' % (
869 # self._payload[self._idx['last_reviewer']],
870 # self._payload[self._idx['last_reviewed']].strftime('%Y-%m-%d %H:%M'),
871 # gmTools.bool2subst (
872 # self._payload[self._idx['is_technically_abnormal']],
873 # _('abnormal and relevant'),
874 # _('normal but relevant')
875 # )
876 # ))
877 # else:
878 # lines.append(_(' unreviewed'))
879 #
880 # return lines
881 #--------------------------------------------------------
882 - def format(self, with_review=True, with_evaluation=True, with_ranges=True, with_episode=True, with_type_details=True, date_format='%Y %b %d %H:%M'):
883
884 # FIXME: add battery, request details
885
886 has_normal_min_or_max = (
887 self._payload[self._idx['val_normal_min']] is not None
888 ) or (
889 self._payload[self._idx['val_normal_max']] is not None
890 )
891 if has_normal_min_or_max:
892 normal_min_max = u'%s - %s' % (
893 gmTools.coalesce(self._payload[self._idx['val_normal_min']], u'?'),
894 gmTools.coalesce(self._payload[self._idx['val_normal_max']], u'?')
895 )
896 else:
897 normal_min_max = u''
898
899 has_clinical_min_or_max = (
900 self._payload[self._idx['val_target_min']] is not None
901 ) or (
902 self._payload[self._idx['val_target_max']] is not None
903 )
904 if has_clinical_min_or_max:
905 clinical_min_max = u'%s - %s' % (
906 gmTools.coalesce(self._payload[self._idx['val_target_min']], u'?'),
907 gmTools.coalesce(self._payload[self._idx['val_target_max']], u'?')
908 )
909 else:
910 clinical_min_max = u''
911
912 # header
913 tt = _(u'Result from %s \n') % gmDateTime.pydt_strftime (
914 self._payload[self._idx['clin_when']],
915 date_format
916 )
917
918 # basics
919 tt += u' ' + _(u'Type: "%(name)s" (%(abbr)s) [#%(pk_type)s]\n') % ({
920 'name': self._payload[self._idx['name_tt']],
921 'abbr': self._payload[self._idx['abbrev_tt']],
922 'pk_type': self._payload[self._idx['pk_test_type']]
923 })
924 tt += u' ' + _(u'Result: %(val)s%(unit)s%(ind)s [#%(pk_result)s]\n') % ({
925 'val': self._payload[self._idx['unified_val']],
926 'unit': gmTools.coalesce(self._payload[self._idx['val_unit']], u'', u' %s'),
927 'ind': gmTools.coalesce(self._payload[self._idx['abnormality_indicator']], u'', u' (%s)'),
928 'pk_result': self._payload[self._idx['pk_test_result']]
929 })
930 tmp = (u'%s%s' % (
931 gmTools.coalesce(self._payload[self._idx['name_test_org']], u''),
932 gmTools.coalesce(self._payload[self._idx['contact_test_org']], u'', u' (%s)'),
933 )).strip()
934 if tmp != u'':
935 tt += u' ' + _(u'Source: %s\n') % tmp
936 tt += u'\n'
937
938 if with_evaluation:
939 norm_eval = None
940 if self._payload[self._idx['val_num']] is not None:
941 # 1) normal range
942 # lowered ?
943 if (self._payload[self._idx['val_normal_min']] is not None) and (self._payload[self._idx['val_num']] < self._payload[self._idx['val_normal_min']]):
944 try:
945 percent = (self._payload[self._idx['val_num']] * 100) / self._payload[self._idx['val_normal_min']]
946 except ZeroDivisionError:
947 percent = None
948 if percent is not None:
949 if percent < 6:
950 norm_eval = _(u'%.1f %% of the normal lower limit') % percent
951 else:
952 norm_eval = _(u'%.0f %% of the normal lower limit') % percent
953 # raised ?
954 if (self._payload[self._idx['val_normal_max']] is not None) and (self._payload[self._idx['val_num']] > self._payload[self._idx['val_normal_max']]):
955 try:
956 x_times = self._payload[self._idx['val_num']] / self._payload[self._idx['val_normal_max']]
957 except ZeroDivisionError:
958 x_times = None
959 if x_times is not None:
960 if x_times < 10:
961 norm_eval = _(u'%.1f times the normal upper limit') % x_times
962 else:
963 norm_eval = _(u'%.0f times the normal upper limit') % x_times
964 if norm_eval is not None:
965 tt += u' (%s)\n' % norm_eval
966 # #-------------------------------------
967 # # this idea was shot down on the list
968 # #-------------------------------------
969 # # bandwidth of deviation
970 # if None not in [self._payload[self._idx['val_normal_min']], self._payload[self._idx['val_normal_max']]]:
971 # normal_width = self._payload[self._idx['val_normal_max']] - self._payload[self._idx['val_normal_min']]
972 # deviation_from_normal_range = None
973 # # below ?
974 # if self._payload[self._idx['val_num']] < self._payload[self._idx['val_normal_min']]:
975 # deviation_from_normal_range = self._payload[self._idx['val_normal_min']] - self._payload[self._idx['val_num']]
976 # # above ?
977 # elif self._payload[self._idx['val_num']] > self._payload[self._idx['val_normal_max']]:
978 # deviation_from_normal_range = self._payload[self._idx['val_num']] - self._payload[self._idx['val_normal_max']]
979 # if deviation_from_normal_range is None:
980 # try:
981 # times_deviation = deviation_from_normal_range / normal_width
982 # except ZeroDivisionError:
983 # times_deviation = None
984 # if times_deviation is not None:
985 # if times_deviation < 10:
986 # tt += u' (%s)\n' % _(u'deviates by %.1f times of the normal range') % times_deviation
987 # else:
988 # tt += u' (%s)\n' % _(u'deviates by %.0f times of the normal range') % times_deviation
989 # #-------------------------------------
990
991 # 2) clinical target range
992 norm_eval = None
993 # lowered ?
994 if (self._payload[self._idx['val_target_min']] is not None) and (self._payload[self._idx['val_num']] < self._payload[self._idx['val_target_min']]):
995 try:
996 percent = (self._payload[self._idx['val_num']] * 100) / self._payload[self._idx['val_target_min']]
997 except ZeroDivisionError:
998 percent = None
999 if percent is not None:
1000 if percent < 6:
1001 norm_eval = _(u'%.1f %% of the target lower limit') % percent
1002 else:
1003 norm_eval = _(u'%.0f %% of the target lower limit') % percent
1004 # raised ?
1005 if (self._payload[self._idx['val_target_max']] is not None) and (self._payload[self._idx['val_num']] > self._payload[self._idx['val_target_max']]):
1006 try:
1007 x_times = self._payload[self._idx['val_num']] / self._payload[self._idx['val_target_max']]
1008 except ZeroDivisionError:
1009 x_times = None
1010 if x_times is not None:
1011 if x_times < 10:
1012 norm_eval = _(u'%.1f times the target upper limit') % x_times
1013 else:
1014 norm_eval = _(u'%.0f times the target upper limit') % x_times
1015 if norm_eval is not None:
1016 tt += u' (%s)\n' % norm_eval
1017 # #-------------------------------------
1018 # # this idea was shot down on the list
1019 # #-------------------------------------
1020 # # bandwidth of deviation
1021 # if None not in [self._payload[self._idx['val_target_min']], self._payload[self._idx['val_target_max']]]:
1022 # normal_width = self._payload[self._idx['val_target_max']] - self._payload[self._idx['val_target_min']]
1023 # deviation_from_target_range = None
1024 # # below ?
1025 # if self._payload[self._idx['val_num']] < self._payload[self._idx['val_target_min']]:
1026 # deviation_from_target_range = self._payload[self._idx['val_target_min']] - self._payload[self._idx['val_num']]
1027 # # above ?
1028 # elif self._payload[self._idx['val_num']] > self._payload[self._idx['val_target_max']]:
1029 # deviation_from_target_range = self._payload[self._idx['val_num']] - self._payload[self._idx['val_target_max']]
1030 # if deviation_from_target_range is None:
1031 # try:
1032 # times_deviation = deviation_from_target_range / normal_width
1033 # except ZeroDivisionError:
1034 # times_deviation = None
1035 # if times_deviation is not None:
1036 # if times_deviation < 10:
1037 # tt += u' (%s)\n' % _(u'deviates by %.1f times of the target range') % times_deviation
1038 # else:
1039 # tt += u' (%s)\n' % _(u'deviates by %.0f times of the target range') % times_deviation
1040 # #-------------------------------------
1041
1042 if with_ranges:
1043 tt += u' ' + _(u'Standard normal range: %(norm_min_max)s%(norm_range)s \n') % ({
1044 'norm_min_max': normal_min_max,
1045 'norm_range': gmTools.coalesce (
1046 self._payload[self._idx['val_normal_range']],
1047 u'',
1048 gmTools.bool2subst (
1049 has_normal_min_or_max,
1050 u' / %s',
1051 u'%s'
1052 )
1053 )
1054 })
1055 if self._payload[self._idx['norm_ref_group']] is not None:
1056 tt += u' ' + _(u'Reference group: %s\n') % self._payload[self._idx['norm_ref_group']]
1057 tt += u' ' + _(u'Clinical target range: %(clin_min_max)s%(clin_range)s \n') % ({
1058 'clin_min_max': clinical_min_max,
1059 'clin_range': gmTools.coalesce (
1060 self._payload[self._idx['val_target_range']],
1061 u'',
1062 gmTools.bool2subst (
1063 has_clinical_min_or_max,
1064 u' / %s',
1065 u'%s'
1066 )
1067 )
1068 })
1069
1070 # metadata
1071 if self._payload[self._idx['comment']] is not None:
1072 tt += u' ' + _(u'Doc: %s\n') % _(u'\n Doc: ').join(self._payload[self._idx['comment']].split(u'\n'))
1073 if self._payload[self._idx['note_test_org']] is not None:
1074 tt += u' ' + _(u'Lab: %s\n') % _(u'\n Lab: ').join(self._payload[self._idx['note_test_org']].split(u'\n'))
1075 if with_episode:
1076 tt += u' ' + _(u'Episode: %s\n') % self._payload[self._idx['episode']]
1077 if self._payload[self._idx['health_issue']] is not None:
1078 tt += u' ' + _(u'Issue: %s\n') % self._payload[self._idx['health_issue']]
1079 if self._payload[self._idx['material']] is not None:
1080 tt += u' ' + _(u'Material: %s\n') % self._payload[self._idx['material']]
1081 if self._payload[self._idx['material_detail']] is not None:
1082 tt += u' ' + _(u'Details: %s\n') % self._payload[self._idx['material_detail']]
1083 tt += u'\n'
1084
1085 if with_review:
1086 if self._payload[self._idx['reviewed']]:
1087 review = gmDateTime.pydt_strftime (
1088 self._payload[self._idx['last_reviewed']],
1089 date_format
1090 )
1091 else:
1092 review = _('not yet')
1093 tt += _(u'Signed (%(sig_hand)s): %(reviewed)s\n') % ({
1094 'sig_hand': gmTools.u_writing_hand,
1095 'reviewed': review
1096 })
1097 tt += u' ' + _(u'Responsible clinician: %s\n') % gmTools.bool2subst (
1098 self._payload[self._idx['you_are_responsible']],
1099 _('you'),
1100 self._payload[self._idx['responsible_reviewer']]
1101 )
1102 if self._payload[self._idx['reviewed']]:
1103 tt += u' ' + _(u'Last reviewer: %(reviewer)s\n') % ({
1104 'reviewer': gmTools.bool2subst (
1105 self._payload[self._idx['review_by_you']],
1106 _('you'),
1107 gmTools.coalesce(self._payload[self._idx['last_reviewer']], u'?')
1108 )
1109 })
1110 tt += u' ' + _(u' Technically abnormal: %(abnormal)s\n') % ({
1111 'abnormal': gmTools.bool2subst (
1112 self._payload[self._idx['is_technically_abnormal']],
1113 _('yes'),
1114 _('no'),
1115 u'?'
1116 )
1117 })
1118 tt += u' ' + _(u' Clinically relevant: %(relevant)s\n') % ({
1119 'relevant': gmTools.bool2subst (
1120 self._payload[self._idx['is_clinically_relevant']],
1121 _('yes'),
1122 _('no'),
1123 u'?'
1124 )
1125 })
1126 if self._payload[self._idx['review_comment']] is not None:
1127 tt += u' ' + _(u' Comment: %s\n') % self._payload[self._idx['review_comment']].strip()
1128 tt += u'\n'
1129
1130 # type
1131 if with_type_details:
1132 tt += _(u'Test type details:\n')
1133 tt += u' ' + _(u'Grouped under "%(name_meta)s" (%(abbrev_meta)s) [#%(pk_u_type)s]\n') % ({
1134 'name_meta': gmTools.coalesce(self._payload[self._idx['name_meta']], u''),
1135 'abbrev_meta': gmTools.coalesce(self._payload[self._idx['abbrev_meta']], u''),
1136 'pk_u_type': self._payload[self._idx['pk_meta_test_type']]
1137 })
1138 if self._payload[self._idx['comment_tt']] is not None:
1139 tt += u' ' + _(u'Type comment: %s\n') % _(u'\n Type comment:').join(self._payload[self._idx['comment_tt']].split(u'\n'))
1140 if self._payload[self._idx['comment_meta']] is not None:
1141 tt += u' ' + _(u'Group comment: %s\n') % _(u'\n Group comment: ').join(self._payload[self._idx['comment_meta']].split(u'\n'))
1142 tt += u'\n'
1143
1144 if with_review:
1145 tt += _(u'Revisions: %(row_ver)s, last %(mod_when)s by %(mod_by)s.') % ({
1146 'row_ver': self._payload[self._idx['row_version']],
1147 'mod_when': gmDateTime.pydt_strftime(self._payload[self._idx['modified_when']],date_format),
1148 'mod_by': self._payload[self._idx['modified_by']]
1149 })
1150
1151 return tt
1152 #--------------------------------------------------------
1154 """Returns the closest test result which does have normal range information."""
1155 if self._payload[self._idx['val_normal_min']] is not None:
1156 return self
1157 if self._payload[self._idx['val_normal_max']] is not None:
1158 return self
1159 if self._payload[self._idx['val_normal_range']] is not None:
1160 return self
1161 cmd = u"""
1162 SELECT * from clin.v_test_results
1163 WHERE
1164 pk_type = %(pk_type)s
1165 AND
1166 val_unit = %(unit)s
1167 AND
1168 (
1169 (val_normal_min IS NOT NULL)
1170 OR
1171 (val_normal_max IS NOT NULL)
1172 OR
1173 (val_normal_range IS NOT NULL)
1174 )
1175 ORDER BY
1176 CASE
1177 WHEN clin_when > %(clin_when)s THEN clin_when - %(clin_when)s
1178 ELSE %(clin_when)s - clin_when
1179 END
1180 LIMIT 1"""
1181 args = {
1182 u'pk_type': self._payload[self._idx['pk_test_type']],
1183 u'unit': self._payload[self._idx['val_unit']],
1184 u'clin_when': self._payload[self._idx['clin_when']]
1185 }
1186 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1187 if len(rows) == 0:
1188 return None
1189 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': rows[0]})
1190
1191 temporally_closest_normal_range = property(_get_temporally_closest_normal_range, lambda x:x)
1192 #--------------------------------------------------------
1194
1195 has_normal_min_or_max = (
1196 self._payload[self._idx['val_normal_min']] is not None
1197 ) or (
1198 self._payload[self._idx['val_normal_max']] is not None
1199 )
1200 if has_normal_min_or_max:
1201 normal_min_max = u'%s - %s' % (
1202 gmTools.coalesce(self._payload[self._idx['val_normal_min']], u'?'),
1203 gmTools.coalesce(self._payload[self._idx['val_normal_max']], u'?')
1204 )
1205
1206 has_clinical_min_or_max = (
1207 self._payload[self._idx['val_target_min']] is not None
1208 ) or (
1209 self._payload[self._idx['val_target_max']] is not None
1210 )
1211 if has_clinical_min_or_max:
1212 clinical_min_max = u'%s - %s' % (
1213 gmTools.coalesce(self._payload[self._idx['val_target_min']], u'?'),
1214 gmTools.coalesce(self._payload[self._idx['val_target_max']], u'?')
1215 )
1216
1217 if has_clinical_min_or_max:
1218 return _('Target: %(clin_min_max)s%(clin_range)s') % ({
1219 'clin_min_max': clinical_min_max,
1220 'clin_range': gmTools.coalesce (
1221 self._payload[self._idx['val_target_range']],
1222 u'',
1223 gmTools.bool2subst (
1224 has_clinical_min_or_max,
1225 u' / %s',
1226 u'%s'
1227 )
1228 )
1229 })
1230
1231 if has_normal_min_or_max:
1232 return _('Norm: %(norm_min_max)s%(norm_range)s') % ({
1233 'norm_min_max': normal_min_max,
1234 'norm_range': gmTools.coalesce (
1235 self._payload[self._idx['val_normal_range']],
1236 u'',
1237 gmTools.bool2subst (
1238 has_normal_min_or_max,
1239 u' / %s',
1240 u'%s'
1241 )
1242 )
1243 })
1244
1245 if self._payload[self._idx['val_target_range']] is not None:
1246 return _('Target: %s') % self._payload[self._idx['val_target_range']],
1247
1248 if self._payload[self._idx['val_normal_range']] is not None:
1249 return _('Norm: %s') % self._payload[self._idx['val_normal_range']]
1250
1251 return None
1252
1253 formatted_range = property(_get_formatted_range, lambda x:x)
1254 #--------------------------------------------------------
1256 return cMeasurementType(aPK_obj = self._payload[self._idx['pk_test_type']])
1257
1258 test_type = property(_get_test_type, lambda x:x)
1259 #--------------------------------------------------------
1261 # 1) the user is right (review)
1262 if self._payload[self._idx['is_technically_abnormal']] is False:
1263 return False
1264 # 2) the lab is right (result.abnormality_indicator)
1265 indicator = self._payload[self._idx['abnormality_indicator']]
1266 if indicator is not None:
1267 indicator = indicator.strip()
1268 if indicator != u'':
1269 if indicator.strip(u'+') == u'':
1270 return True
1271 if indicator.strip(u'-') == u'':
1272 return False
1273 # 3) non-numerical value ?
1274 if self._payload[self._idx['val_num']] is None:
1275 return None
1276 # 4) the target range is right
1277 target_max = self._payload[self._idx['val_target_max']]
1278 if target_max is not None:
1279 if target_max < self._payload[self._idx['val_num']]:
1280 return True
1281 # 4) the normal range is right
1282 normal_max = self._payload[self._idx['val_normal_max']]
1283 if normal_max is not None:
1284 if normal_max < self._payload[self._idx['val_num']]:
1285 return True
1286 return None
1287
1288 is_considered_elevated = property(_get_is_considered_elevated, lambda x:x)
1289 #--------------------------------------------------------
1291 # 1) the user is right (review)
1292 if self._payload[self._idx['is_technically_abnormal']] is False:
1293 return False
1294 # 2) the lab is right (result.abnormality_indicator)
1295 indicator = self._payload[self._idx['abnormality_indicator']]
1296 if indicator is not None:
1297 indicator = indicator.strip()
1298 if indicator != u'':
1299 if indicator.strip(u'+') == u'':
1300 return False
1301 if indicator.strip(u'-') == u'':
1302 return True
1303 # 3) non-numerical value ?
1304 if self._payload[self._idx['val_num']] is None:
1305 return None
1306 # 4) the target range is right
1307 target_min = self._payload[self._idx['val_target_min']]
1308 if target_min is not None:
1309 if target_min > self._payload[self._idx['val_num']]:
1310 return True
1311 # 4) the normal range is right
1312 normal_min = self._payload[self._idx['val_normal_min']]
1313 if normal_min is not None:
1314 if normal_min > self._payload[self._idx['val_num']]:
1315 return True
1316 return None
1317
1318 is_considered_lowered = property(_get_is_considered_lowered, lambda x:x)
1319 #--------------------------------------------------------
1321 if self.is_considered_lowered is True:
1322 return True
1323 if self.is_considered_elevated is True:
1324 return True
1325 if (self.is_considered_lowered is False) and (self.is_considered_elevated is False):
1326 return False
1327 return self._payload[self._idx['is_technically_abnormal']]
1328
1329 is_considered_abnormal = property(_get_is_considered_abnormal, lambda x:x)
1330 #--------------------------------------------------------
1332 # 1) the user is right
1333 if self._payload[self._idx['is_technically_abnormal']] is False:
1334 return u''
1335 # 2) the lab is right (result.abnormality_indicator)
1336 indicator = self._payload[self._idx['abnormality_indicator']]
1337 if indicator is not None:
1338 indicator = indicator.strip()
1339 if indicator != u'':
1340 return indicator
1341 # 3) non-numerical value ? then we can' know more
1342 if self._payload[self._idx['val_num']] is None:
1343 return None
1344 # 4) the target range is right
1345 target_min = self._payload[self._idx['val_target_min']]
1346 if target_min is not None:
1347 if target_min > self._payload[self._idx['val_num']]:
1348 return u'-'
1349 target_max = self._payload[self._idx['val_target_max']]
1350 if target_max is not None:
1351 if target_max < self._payload[self._idx['val_num']]:
1352 return u'+'
1353 # 4) the normal range is right
1354 normal_min = self._payload[self._idx['val_normal_min']]
1355 if normal_min is not None:
1356 if normal_min > self._payload[self._idx['val_num']]:
1357 return u'-'
1358 normal_max = self._payload[self._idx['val_normal_max']]
1359 if normal_max is not None:
1360 if normal_max < self._payload[self._idx['val_num']]:
1361 return u'+'
1362 # reviewed, abnormal, but no indicator available
1363 if self._payload[self._idx['is_technically_abnormal']] is True:
1364 return gmTools.u_plus_minus
1365
1366 return None
1367
1368 formatted_abnormality_indicator = property(_get_formatted_abnormality_indicator, lambda x:x)
1369 #--------------------------------------------------------
1370 - def set_review(self, technically_abnormal=None, clinically_relevant=None, comment=None, make_me_responsible=False):
1371
1372 # FIXME: this is not concurrency safe
1373 if self._payload[self._idx['reviewed']]:
1374 self.__change_existing_review (
1375 technically_abnormal = technically_abnormal,
1376 clinically_relevant = clinically_relevant,
1377 comment = comment
1378 )
1379 else:
1380 # do not sign off unreviewed results if
1381 # NOTHING AT ALL is known about them
1382 if technically_abnormal is None:
1383 if clinically_relevant is None:
1384 comment = gmTools.none_if(comment, u'', strip_string = True)
1385 if comment is None:
1386 if make_me_responsible is False:
1387 return True
1388 self.__set_new_review (
1389 technically_abnormal = technically_abnormal,
1390 clinically_relevant = clinically_relevant,
1391 comment = comment
1392 )
1393
1394 if make_me_responsible is True:
1395 cmd = u"SELECT pk FROM dem.staff WHERE db_user = current_user"
1396 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
1397 self['pk_intended_reviewer'] = rows[0][0]
1398 self.save_payload()
1399 return
1400
1401 self.refetch_payload()
1402 #--------------------------------------------------------
1403 - def get_adjacent_results(self, desired_earlier_results=1, desired_later_results=1, max_offset=None):
1404
1405 if desired_earlier_results < 1:
1406 raise ValueError('<desired_earlier_results> must be > 0')
1407
1408 if desired_later_results < 1:
1409 raise ValueError('<desired_later_results> must be > 0')
1410
1411 args = {
1412 'pat': self._payload[self._idx['pk_patient']],
1413 'ttyp': self._payload[self._idx['pk_test_type']],
1414 'tloinc': self._payload[self._idx['loinc_tt']],
1415 'mtyp': self._payload[self._idx['pk_meta_test_type']],
1416 'mloinc': self._payload[self._idx['loinc_meta']],
1417 'when': self._payload[self._idx['clin_when']],
1418 'offset': max_offset
1419 }
1420 WHERE = u'((pk_test_type = %(ttyp)s) OR (loinc_tt = %(tloinc)s))'
1421 WHERE_meta = u'((pk_meta_test_type = %(mtyp)s) OR (loinc_meta = %(mloinc)s))'
1422 if max_offset is not None:
1423 WHERE = WHERE + u' AND (clin_when BETWEEN (%(when)s - %(offset)s) AND (%(when)s + %(offset)s))'
1424 WHERE_meta = WHERE_meta + u' AND (clin_when BETWEEN (%(when)s - %(offset)s) AND (%(when)s + %(offset)s))'
1425
1426 SQL = u"""
1427 SELECT * FROM clin.v_test_results
1428 WHERE
1429 pk_patient = %%(pat)s
1430 AND
1431 clin_when %s %%(when)s
1432 AND
1433 %s
1434 ORDER BY clin_when
1435 LIMIT %s"""
1436
1437 # get earlier results
1438 earlier_results = []
1439 # by type
1440 cmd = SQL % (u'<', WHERE, desired_earlier_results)
1441 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1442 if len(rows) > 0:
1443 earlier_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r}) for r in rows ])
1444 # by meta type ?
1445 missing_results = desired_earlier_results - len(earlier_results)
1446 if missing_results > 0:
1447 cmd = SQL % (u'<', WHERE_meta, missing_results)
1448 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1449 if len(rows) > 0:
1450 earlier_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r}) for r in rows ])
1451
1452 # get later results
1453 later_results = []
1454 # by type
1455 cmd = SQL % (u'>', WHERE, desired_later_results)
1456 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1457 if len(rows) > 0:
1458 later_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r}) for r in rows ])
1459 # by meta type ?
1460 missing_results = desired_later_results - len(later_results)
1461 if missing_results > 0:
1462 cmd = SQL % (u'>', WHERE_meta, missing_results)
1463 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1464 if len(rows) > 0:
1465 later_results.extend([ cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r}) for r in rows ])
1466
1467 return earlier_results, later_results
1468 #--------------------------------------------------------
1469 # internal API
1470 #--------------------------------------------------------
1471 - def __set_new_review(self, technically_abnormal=None, clinically_relevant=None, comment=None):
1472 """Add a review to a row.
1473
1474 - if technically abnormal is not provided/None it will be set
1475 to True if the lab's indicator has a meaningful value
1476 - if clinically relevant is not provided/None it is set to
1477 whatever technically abnormal is
1478 """
1479 if technically_abnormal is None:
1480 technically_abnormal = False
1481 if self._payload[self._idx['abnormality_indicator']] is not None:
1482 if self._payload[self._idx['abnormality_indicator']].strip() != u'':
1483 technically_abnormal = True
1484
1485 if clinically_relevant is None:
1486 clinically_relevant = technically_abnormal
1487
1488 cmd = u"""
1489 INSERT INTO clin.reviewed_test_results (
1490 fk_reviewed_row,
1491 is_technically_abnormal,
1492 clinically_relevant,
1493 comment
1494 ) VALUES (
1495 %(pk)s,
1496 %(abnormal)s,
1497 %(relevant)s,
1498 gm.nullify_empty_string(%(cmt)s)
1499 )"""
1500 args = {
1501 'pk': self._payload[self._idx['pk_test_result']],
1502 'abnormal': technically_abnormal,
1503 'relevant': clinically_relevant,
1504 'cmt': comment
1505 }
1506
1507 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
1508 #--------------------------------------------------------
1509 - def __change_existing_review(self, technically_abnormal=None, clinically_relevant=None, comment=None):
1510 """Change a review on a row.
1511
1512 - if technically abnormal/clinically relevant are
1513 None they are not set
1514 """
1515 args = {
1516 'pk_row': self._payload[self._idx['pk_test_result']],
1517 'abnormal': technically_abnormal,
1518 'relevant': clinically_relevant,
1519 'cmt': comment
1520 }
1521
1522 set_parts = [
1523 u'fk_reviewer = (SELECT pk FROM dem.staff WHERE db_user = current_user)',
1524 u'comment = gm.nullify_empty_string(%(cmt)s)'
1525 ]
1526
1527 if technically_abnormal is not None:
1528 set_parts.append(u'is_technically_abnormal = %(abnormal)s')
1529
1530 if clinically_relevant is not None:
1531 set_parts.append(u'clinically_relevant = %(relevant)s')
1532
1533 cmd = u"""
1534 UPDATE clin.reviewed_test_results SET
1535 %s
1536 WHERE
1537 fk_reviewed_row = %%(pk_row)s
1538 """ % u',\n '.join(set_parts)
1539
1540 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
1541
1542 #------------------------------------------------------------
1544
1545 where_parts = []
1546
1547 if pk_patient is not None:
1548 where_parts.append(u'pk_patient = %(pat)s')
1549 args = {'pat': pk_patient}
1550
1551 # if tests is not None:
1552 # where_parts.append(u'pk_test_type IN %(tests)s')
1553 # args['tests'] = tuple(tests)
1554
1555 if encounters is not None:
1556 where_parts.append(u'pk_encounter IN %(encs)s')
1557 args['encs'] = tuple(encounters)
1558
1559 if episodes is not None:
1560 where_parts.append(u'pk_episode IN %(epis)s')
1561 args['epis'] = tuple(episodes)
1562
1563 if order_by is None:
1564 order_by = u''
1565 else:
1566 order_by = u'ORDER BY %s' % order_by
1567
1568 cmd = u"""
1569 SELECT * FROM clin.v_test_results
1570 WHERE %s
1571 %s
1572 """ % (
1573 u' AND '.join(where_parts),
1574 order_by
1575 )
1576 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1577
1578 tests = [ cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r}) for r in rows ]
1579 return tests
1580
1581 #------------------------------------------------------------
1582 -def get_result_at_timestamp(timestamp=None, test_type=None, loinc=None, tolerance_interval=None, patient=None):
1583
1584 if None not in [test_type, loinc]:
1585 raise ValueError('either <test_type> or <loinc> must be None')
1586
1587 args = {
1588 'pat': patient,
1589 'ttyp': test_type,
1590 'loinc': loinc,
1591 'ts': timestamp,
1592 'intv': tolerance_interval
1593 }
1594
1595 where_parts = [u'pk_patient = %(pat)s']
1596 if test_type is not None:
1597 where_parts.append(u'pk_test_type = %(ttyp)s') # consider: pk_meta_test_type = %(pkmtt)s / self._payload[self._idx['pk_meta_test_type']]
1598 elif loinc is not None:
1599 where_parts.append(u'((loinc_tt IN %(loinc)s) OR (loinc_meta IN %(loinc)s))')
1600 args['loinc'] = tuple(loinc)
1601
1602 if tolerance_interval is None:
1603 where_parts.append(u'clin_when = %(ts)s')
1604 else:
1605 where_parts.append(u'clin_when between (%(ts)s - %(intv)s::interval) AND (%(ts)s + %(intv)s::interval)')
1606
1607 cmd = u"""
1608 SELECT * FROM clin.v_test_results
1609 WHERE
1610 %s
1611 ORDER BY
1612 abs(extract(epoch from age(clin_when, %%(ts)s)))
1613 LIMIT 1""" % u' AND '.join(where_parts)
1614
1615 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1616 if len(rows) == 0:
1617 return None
1618
1619 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': rows[0]})
1620
1621 #------------------------------------------------------------
1623
1624 if None not in [test_type, loinc]:
1625 raise ValueError('either <test_type> or <loinc> must be None')
1626
1627 if no_of_results < 1:
1628 raise ValueError('<no_of_results> must be > 0')
1629
1630 args = {
1631 'pat': patient,
1632 'ttyp': test_type,
1633 'loinc': loinc
1634 }
1635
1636 where_parts = [u'pk_patient = %(pat)s']
1637 if test_type is not None:
1638 where_parts.append(u'pk_test_type = %(ttyp)s') # consider: pk_meta_test_type = %(pkmtt)s / self._payload[self._idx['pk_meta_test_type']]
1639 elif loinc is not None:
1640 where_parts.append(u'((loinc_tt IN %(loinc)s) OR (loinc_meta IN %(loinc)s))')
1641 args['loinc'] = tuple(loinc)
1642
1643 cmd = u"""
1644 SELECT * FROM clin.v_test_results
1645 WHERE
1646 %s
1647 ORDER BY clin_when DESC
1648 LIMIT %s""" % (
1649 u' AND '.join(where_parts),
1650 no_of_results
1651 )
1652 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1653 if len(rows) == 0:
1654 return None
1655
1656 if no_of_results == 1:
1657 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': rows[0]})
1658
1659 return [ cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': r}) for r in rows ]
1660
1661 #------------------------------------------------------------
1663
1664 if None not in [test_type, loinc]:
1665 raise ValueError('either <test_type> or <loinc> must be None')
1666
1667 args = {
1668 'pat': patient,
1669 'ttyp': test_type,
1670 'loinc': loinc
1671 }
1672
1673 where_parts = [u'pk_patient = %(pat)s']
1674 if test_type is not None:
1675 where_parts.append(u'pk_test_type = %(ttyp)s') # consider: pk_meta_test_type = %(pkmtt)s / self._payload[self._idx['pk_meta_test_type']]
1676 elif loinc is not None:
1677 where_parts.append(u'((loinc_tt IN %(loinc)s) OR (loinc_meta IN %(loinc)s))')
1678 args['loinc'] = tuple(loinc)
1679
1680 cmd = u"""
1681 SELECT * FROM clin.v_test_results
1682 WHERE
1683 %s
1684 ORDER BY clin_when
1685 LIMIT 1""" % u' AND '.join(where_parts)
1686 rows, idx = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}], get_col_idx = True)
1687 if len(rows) == 0:
1688 return None
1689
1690 return cTestResult(row = {'pk_field': 'pk_test_result', 'idx': idx, 'data': rows[0]})
1691
1692 #------------------------------------------------------------
1694 try:
1695 pk = int(result)
1696 except (TypeError, AttributeError):
1697 pk = result['pk_test_result']
1698
1699 cmd = u'DELETE FROM clin.test_result WHERE pk = %(pk)s'
1700 gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': {'pk': pk}}])
1701
1702 #------------------------------------------------------------
1703 -def create_test_result(encounter=None, episode=None, type=None, intended_reviewer=None, val_num=None, val_alpha=None, unit=None):
1704
1705 cmd1 = u"""
1706 insert into clin.test_result (
1707 fk_encounter,
1708 fk_episode,
1709 fk_type,
1710 fk_intended_reviewer,
1711 val_num,
1712 val_alpha,
1713 val_unit
1714 ) values (
1715 %(enc)s,
1716 %(epi)s,
1717 %(type)s,
1718 %(rev)s,
1719 %(v_num)s,
1720 %(v_alpha)s,
1721 %(unit)s
1722 )"""
1723
1724 cmd2 = u"""
1725 select *
1726 from
1727 clin.v_test_results
1728 where
1729 pk_test_result = currval(pg_get_serial_sequence('clin.test_result', 'pk'))"""
1730
1731 args = {
1732 u'enc': encounter,
1733 u'epi': episode,
1734 u'type': type,
1735 u'rev': intended_reviewer,
1736 u'v_num': val_num,
1737 u'v_alpha': val_alpha,
1738 u'unit': unit
1739 }
1740
1741 rows, idx = gmPG2.run_rw_queries (
1742 queries = [
1743 {'cmd': cmd1, 'args': args},
1744 {'cmd': cmd2}
1745 ],
1746 return_data = True,
1747 get_col_idx = True
1748 )
1749
1750 tr = cTestResult(row = {
1751 'pk_field': 'pk_test_result',
1752 'idx': idx,
1753 'data': rows[0]
1754 })
1755
1756 return tr
1757
1758 #------------------------------------------------------------
1760
1761 _log.debug(u'formatting test results into [%s]', output_format)
1762
1763 if output_format == u'latex':
1764 return __format_test_results_latex(results = results)
1765
1766 msg = _('unknown test results output format [%s]') % output_format
1767 _log.error(msg)
1768 return msg
1769
1770 #------------------------------------------------------------
1772
1773 if len(results) == 0:
1774 return u'\\begin{minipage}{%s} \\end{minipage}' % width
1775
1776 lines = []
1777 for t in results:
1778
1779 tmp = u''
1780
1781 if show_time:
1782 tmp += u'{\\tiny (%s)} ' % t['clin_when'].strftime('%H:%M')
1783
1784 tmp += u'%.8s' % t['unified_val']
1785
1786 lines.append(tmp)
1787 tmp = u''
1788
1789 if show_range:
1790 has_range = (
1791 t['unified_target_range'] is not None
1792 or
1793 t['unified_target_min'] is not None
1794 or
1795 t['unified_target_max'] is not None
1796 )
1797 if has_range:
1798 if t['unified_target_range'] is not None:
1799 tmp += u'{\\tiny %s}' % t['unified_target_range']
1800 else:
1801 tmp += u'{\\tiny %s}' % (
1802 gmTools.coalesce(t['unified_target_min'], u'- ', u'%s - '),
1803 gmTools.coalesce(t['unified_target_max'], u'', u'%s')
1804 )
1805 lines.append(tmp)
1806
1807 return u'\\begin{minipage}{%s} \\begin{flushright} %s \\end{flushright} \\end{minipage}' % (width, u' \\\\ '.join(lines))
1808
1809 #------------------------------------------------------------
1811
1812 if len(results) == 0:
1813 return u''
1814
1815 lines = []
1816 for t in results:
1817
1818 tmp = u''
1819
1820 if show_time:
1821 tmp += u'\\tiny %s ' % t['clin_when'].strftime('%H:%M')
1822
1823 tmp += u'\\normalsize %.8s' % t['unified_val']
1824
1825 lines.append(tmp)
1826 tmp = u'\\tiny %s' % gmTools.coalesce(t['val_unit'], u'', u'%s ')
1827
1828 if not show_range:
1829 lines.append(tmp)
1830 continue
1831
1832 has_range = (
1833 t['unified_target_range'] is not None
1834 or
1835 t['unified_target_min'] is not None
1836 or
1837 t['unified_target_max'] is not None
1838 )
1839
1840 if not has_range:
1841 lines.append(tmp)
1842 continue
1843
1844 if t['unified_target_range'] is not None:
1845 tmp += u'[%s]' % t['unified_target_range']
1846 else:
1847 tmp += u'[%s%s]' % (
1848 gmTools.coalesce(t['unified_target_min'], u'--', u'%s--'),
1849 gmTools.coalesce(t['unified_target_max'], u'', u'%s')
1850 )
1851 lines.append(tmp)
1852
1853 return u' \\\\ '.join(lines)
1854
1855 #------------------------------------------------------------
1857
1858 if len(results) == 0:
1859 return u'\\noindent %s' % _('No test results to format.')
1860
1861 # discover the columns and rows
1862 dates = {}
1863 tests = {}
1864 grid = {}
1865 for result in results:
1866 # row_label = u'%s \\ \\tiny (%s)}' % (result['unified_abbrev'], result['unified_name'])
1867 row_label = result['unified_abbrev']
1868 tests[row_label] = None
1869 col_label = u'{\\scriptsize %s}' % result['clin_when'].strftime('%Y-%m-%d')
1870 dates[col_label] = None
1871 try:
1872 grid[row_label]
1873 except KeyError:
1874 grid[row_label] = {}
1875 try:
1876 grid[row_label][col_label].append(result)
1877 except KeyError:
1878 grid[row_label][col_label] = [result]
1879
1880 col_labels = sorted(dates.keys(), reverse = True)
1881 del dates
1882 row_labels = sorted(tests.keys())
1883 del tests
1884
1885 col_def = len(col_labels) * u'>{\\raggedleft}p{1.7cm}|'
1886
1887 # format them
1888 tex = u"""\\noindent %s
1889
1890 \\noindent \\begin{tabular}{|l|%s}
1891 \\hline
1892 & %s \\tabularnewline
1893 \\hline
1894
1895 %%s \\tabularnewline
1896
1897 \\hline
1898
1899 \\end{tabular}""" % (
1900 _('Test results'),
1901 col_def,
1902 u' & '.join(col_labels)
1903 )
1904
1905 rows = []
1906
1907 # loop over rows
1908 for rl in row_labels:
1909 cells = [rl]
1910 # loop over cols per row
1911 for cl in col_labels:
1912 try:
1913 # get tests for this (row/col) position
1914 tests = grid[rl][cl]
1915 except KeyError:
1916 # none there, so insert empty cell
1917 cells.append(u' ')
1918 continue
1919
1920 cells.append (
1921 __tests2latex_cell (
1922 results = tests,
1923 show_time = (len(tests) > 1),
1924 show_range = True
1925 )
1926 )
1927
1928 rows.append(u' & '.join(cells))
1929
1930 return tex % u' \\tabularnewline\n \\hline\n'.join(rows)
1931
1932 #============================================================
1934
1935 if filename is None:
1936 filename = gmTools.get_unique_filename(prefix = u'gm2gpl-', suffix = '.dat')
1937
1938 # sort results into series by test type
1939 series = {}
1940 for r in results:
1941 try:
1942 series[r['unified_name']].append(r)
1943 except KeyError:
1944 series[r['unified_name']] = [r]
1945
1946 gp_data = codecs.open(filename, 'wb', 'utf8')
1947
1948 gp_data.write(u'# %s\n' % _('GNUmed test results export for Gnuplot plotting'))
1949 gp_data.write(u'# -------------------------------------------------------------\n')
1950 gp_data.write(u'# first line of index: test type abbreviation & name\n')
1951 gp_data.write(u'#\n')
1952 gp_data.write(u'# clin_when at full precision\n')
1953 gp_data.write(u'# value\n')
1954 gp_data.write(u'# unit\n')
1955 gp_data.write(u'# unified (target or normal) range: lower bound\n')
1956 gp_data.write(u'# unified (target or normal) range: upper bound\n')
1957 gp_data.write(u'# normal range: lower bound\n')
1958 gp_data.write(u'# normal range: upper bound\n')
1959 gp_data.write(u'# target range: lower bound\n')
1960 gp_data.write(u'# target range: upper bound\n')
1961 gp_data.write(u'# clin_when formatted into string as x-axis tic label\n')
1962 gp_data.write(u'# -------------------------------------------------------------\n')
1963
1964 for test_type in series.keys():
1965 if len(series[test_type]) == 0:
1966 continue
1967
1968 r = series[test_type][0]
1969 title = u'%s (%s)' % (
1970 r['unified_abbrev'],
1971 r['unified_name']
1972 )
1973 gp_data.write(u'\n\n"%s" "%s"\n' % (title, title))
1974
1975 prev_date = None
1976 prev_year = None
1977 for r in series[test_type]:
1978 curr_date = r['clin_when'].strftime('%Y-%m-%d')
1979 if curr_date == prev_date:
1980 gp_data.write(u'\n# %s\n' % _('blank line inserted to allow for discontinued line drawing for same-day values'))
1981 if show_year:
1982 if r['clin_when'].year == prev_year:
1983 when_template = '%b %d %H:%M'
1984 else:
1985 when_template = '%b %d %H:%M (%Y)'
1986 prev_year = r['clin_when'].year
1987 else:
1988 when_template = '%b %d'
1989 gp_data.write (u'%s %s "%s" %s %s %s %s %s %s "%s"\n' % (
1990 r['clin_when'].strftime('%Y-%m-%d_%H:%M'),
1991 r['unified_val'],
1992 gmTools.coalesce(r['val_unit'], u'"<?>"'),
1993 gmTools.coalesce(r['unified_target_min'], u'"<?>"'),
1994 gmTools.coalesce(r['unified_target_max'], u'"<?>"'),
1995 gmTools.coalesce(r['val_normal_min'], u'"<?>"'),
1996 gmTools.coalesce(r['val_normal_max'], u'"<?>"'),
1997 gmTools.coalesce(r['val_target_min'], u'"<?>"'),
1998 gmTools.coalesce(r['val_target_max'], u'"<?>"'),
1999 gmDateTime.pydt_strftime (
2000 r['clin_when'],
2001 format = when_template,
2002 accuracy = gmDateTime.acc_minutes
2003 )
2004 ))
2005 prev_date = curr_date
2006
2007 gp_data.close()
2008
2009 return filename
2010
2011 #============================================================
2013 """Represents one lab result."""
2014
2015 _cmd_fetch_payload = """
2016 select *, xmin_test_result from v_results4lab_req
2017 where pk_result=%s"""
2018 _cmds_lock_rows_for_update = [
2019 """select 1 from test_result where pk=%(pk_result)s and xmin=%(xmin_test_result)s for update"""
2020 ]
2021 _cmds_store_payload = [
2022 """update test_result set
2023 clin_when = %(val_when)s,
2024 narrative = %(progress_note_result)s,
2025 fk_type = %(pk_test_type)s,
2026 val_num = %(val_num)s::numeric,
2027 val_alpha = %(val_alpha)s,
2028 val_unit = %(val_unit)s,
2029 val_normal_min = %(val_normal_min)s,
2030 val_normal_max = %(val_normal_max)s,
2031 val_normal_range = %(val_normal_range)s,
2032 val_target_min = %(val_target_min)s,
2033 val_target_max = %(val_target_max)s,
2034 val_target_range = %(val_target_range)s,
2035 abnormality_indicator = %(abnormal)s,
2036 norm_ref_group = %(ref_group)s,
2037 note_provider = %(note_provider)s,
2038 material = %(material)s,
2039 material_detail = %(material_detail)s
2040 where pk = %(pk_result)s""",
2041 """select xmin_test_result from v_results4lab_req where pk_result=%(pk_result)s"""
2042 ]
2043
2044 _updatable_fields = [
2045 'val_when',
2046 'progress_note_result',
2047 'val_num',
2048 'val_alpha',
2049 'val_unit',
2050 'val_normal_min',
2051 'val_normal_max',
2052 'val_normal_range',
2053 'val_target_min',
2054 'val_target_max',
2055 'val_target_range',
2056 'abnormal',
2057 'ref_group',
2058 'note_provider',
2059 'material',
2060 'material_detail'
2061 ]
2062 #--------------------------------------------------------
2064 """Instantiate.
2065
2066 aPK_obj as dict:
2067 - patient_id
2068 - when_field (see view definition)
2069 - when
2070 - test_type
2071 - val_num
2072 - val_alpha
2073 - unit
2074 """
2075 # instantiate from row data ?
2076 if aPK_obj is None:
2077 gmBusinessDBObject.cBusinessDBObject.__init__(self, row=row)
2078 return
2079 pk = aPK_obj
2080 # find PK from row data ?
2081 if type(aPK_obj) == types.DictType:
2082 # sanity checks
2083 if None in [aPK_obj['patient_id'], aPK_obj['when'], aPK_obj['when_field'], aPK_obj['test_type'], aPK_obj['unit']]:
2084 raise gmExceptions.ConstructorError, 'parameter error: %s' % aPK_obj
2085 if (aPK_obj['val_num'] is None) and (aPK_obj['val_alpha'] is None):
2086 raise gmExceptions.ConstructorError, 'parameter error: val_num and val_alpha cannot both be None'
2087 # get PK
2088 where_snippets = [
2089 'pk_patient=%(patient_id)s',
2090 'pk_test_type=%(test_type)s',
2091 '%s=%%(when)s' % aPK_obj['when_field'],
2092 'val_unit=%(unit)s'
2093 ]
2094 if aPK_obj['val_num'] is not None:
2095 where_snippets.append('val_num=%(val_num)s::numeric')
2096 if aPK_obj['val_alpha'] is not None:
2097 where_snippets.append('val_alpha=%(val_alpha)s')
2098
2099 where_clause = ' and '.join(where_snippets)
2100 cmd = "select pk_result from v_results4lab_req where %s" % where_clause
2101 data = gmPG.run_ro_query('historica', cmd, None, aPK_obj)
2102 if data is None:
2103 raise gmExceptions.ConstructorError, 'error getting lab result for: %s' % aPK_obj
2104 if len(data) == 0:
2105 raise gmExceptions.NoSuchClinItemError, 'no lab result for: %s' % aPK_obj
2106 pk = data[0][0]
2107 # instantiate class
2108 gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk)
2109 #--------------------------------------------------------
2111 cmd = """
2112 select
2113 %s,
2114 vbp.title,
2115 vbp.firstnames,
2116 vbp.lastnames,
2117 vbp.dob
2118 from v_basic_person vbp
2119 where vbp.pk_identity=%%s""" % self._payload[self._idx['pk_patient']]
2120 pat = gmPG.run_ro_query('historica', cmd, None, self._payload[self._idx['pk_patient']])
2121 return pat[0]
2122 #============================================================
2124 """Represents one lab request."""
2125
2126 _cmd_fetch_payload = """
2127 select *, xmin_lab_request from v_lab_requests
2128 where pk_request=%s"""
2129 _cmds_lock_rows_for_update = [
2130 """select 1 from lab_request where pk=%(pk_request)s and xmin=%(xmin_lab_request)s for update"""
2131 ]
2132 _cmds_store_payload = [
2133 """update lab_request set
2134 request_id=%(request_id)s,
2135 lab_request_id=%(lab_request_id)s,
2136 clin_when=%(sampled_when)s,
2137 lab_rxd_when=%(lab_rxd_when)s,
2138 results_reported_when=%(results_reported_when)s,
2139 request_status=%(request_status)s,
2140 is_pending=%(is_pending)s::bool,
2141 narrative=%(progress_note)s
2142 where pk=%(pk_request)s""",
2143 """select xmin_lab_request from v_lab_requests where pk_request=%(pk_request)s"""
2144 ]
2145 _updatable_fields = [
2146 'request_id',
2147 'lab_request_id',
2148 'sampled_when',
2149 'lab_rxd_when',
2150 'results_reported_when',
2151 'request_status',
2152 'is_pending',
2153 'progress_note'
2154 ]
2155 #--------------------------------------------------------
2157 """Instantiate lab request.
2158
2159 The aPK_obj can be either a dict with the keys "req_id"
2160 and "lab" or a simple primary key.
2161 """
2162 # instantiate from row data ?
2163 if aPK_obj is None:
2164 gmBusinessDBObject.cBusinessDBObject.__init__(self, row=row)
2165 return
2166 pk = aPK_obj
2167 # instantiate from "req_id" and "lab" ?
2168 if type(aPK_obj) == types.DictType:
2169 # sanity check
2170 try:
2171 aPK_obj['req_id']
2172 aPK_obj['lab']
2173 except:
2174 _log.exception('[%s:??]: faulty <aPK_obj> structure: [%s]' % (self.__class__.__name__, aPK_obj), sys.exc_info())
2175 raise gmExceptions.ConstructorError, '[%s:??]: cannot derive PK from [%s]' % (self.__class__.__name__, aPK_obj)
2176 # generate query
2177 where_snippets = []
2178 vals = {}
2179 where_snippets.append('request_id=%(req_id)s')
2180 if type(aPK_obj['lab']) == types.IntType:
2181 where_snippets.append('pk_test_org=%(lab)s')
2182 else:
2183 where_snippets.append('lab_name=%(lab)s')
2184 where_clause = ' and '.join(where_snippets)
2185 cmd = "select pk_request from v_lab_requests where %s" % where_clause
2186 # get pk
2187 data = gmPG.run_ro_query('historica', cmd, None, aPK_obj)
2188 if data is None:
2189 raise gmExceptions.ConstructorError, '[%s:??]: error getting lab request for [%s]' % (self.__class__.__name__, aPK_obj)
2190 if len(data) == 0:
2191 raise gmExceptions.NoSuchClinItemError, '[%s:??]: no lab request for [%s]' % (self.__class__.__name__, aPK_obj)
2192 pk = data[0][0]
2193 # instantiate class
2194 gmBusinessDBObject.cBusinessDBObject.__init__(self, aPK_obj=pk)
2195 #--------------------------------------------------------
2197 cmd = """
2198 select vpi.pk_patient, vbp.title, vbp.firstnames, vbp.lastnames, vbp.dob
2199 from v_pat_items vpi, v_basic_person vbp
2200 where
2201 vpi.pk_item=%s
2202 and
2203 vbp.pk_identity=vpi.pk_patient"""
2204 pat = gmPG.run_ro_query('historica', cmd, None, self._payload[self._idx['pk_item']])
2205 if pat is None:
2206 _log.error('cannot get patient for lab request [%s]' % self._payload[self._idx['pk_item']])
2207 return None
2208 if len(pat) == 0:
2209 _log.error('no patient associated with lab request [%s]' % self._payload[self._idx['pk_item']])
2210 return None
2211 return pat[0]
2212 #============================================================
2213 # convenience functions
2214 #------------------------------------------------------------
2215 -def create_lab_request(lab=None, req_id=None, pat_id=None, encounter_id=None, episode_id=None):
2216 """Create or get lab request.
2217
2218 returns tuple (status, value):
2219 (True, lab request instance)
2220 (False, error message)
2221 (None, housekeeping_todo primary key)
2222 """
2223 req = None
2224 aPK_obj = {
2225 'lab': lab,
2226 'req_id': req_id
2227 }
2228 try:
2229 req = cLabRequest (aPK_obj)
2230 except gmExceptions.NoSuchClinItemError, msg:
2231 _log.info('%s: will try to create lab request' % str(msg))
2232 except gmExceptions.ConstructorError, msg:
2233 _log.exception(str(msg), sys.exc_info(), verbose=0)
2234 return (False, msg)
2235 # found
2236 if req is not None:
2237 db_pat = req.get_patient()
2238 if db_pat is None:
2239 _log.error('cannot cross-check patient on lab request')
2240 return (None, '')
2241 # yes but ambigous
2242 if pat_id != db_pat[0]:
2243 _log.error('lab request found for [%s:%s] but patient mismatch: expected [%s], in DB [%s]' % (lab, req_id, pat_id, db_pat))
2244 me = '$RCSfile: gmPathLab.py,v $ $Revision: 1.81 $'
2245 to = 'user'
2246 prob = _('The lab request already exists but belongs to a different patient.')
2247 sol = _('Verify which patient this lab request really belongs to.')
2248 ctxt = _('lab [%s], request ID [%s], expected link with patient [%s], currently linked to patient [%s]') % (lab, req_id, pat_id, db_pat)
2249 cat = 'lab'
2250 status, data = gmPG.add_housekeeping_todo(me, to, prob, sol, ctxt, cat)
2251 return (None, data)
2252 return (True, req)
2253 # not found
2254 queries = []
2255 if type(lab) is types.IntType:
2256 cmd = "insert into lab_request (fk_encounter, fk_episode, fk_test_org, request_id) values (%s, %s, %s, %s)"
2257 else:
2258 cmd = "insert into lab_request (fk_encounter, fk_episode, fk_test_org, request_id) values (%s, %s, (select pk from test_org where internal_OBSOLETE_name=%s), %s)"
2259 queries.append((cmd, [encounter_id, episode_id, str(lab), req_id]))
2260 cmd = "select currval('lab_request_pk_seq')"
2261 queries.append((cmd, []))
2262 # insert new
2263 result, err = gmPG.run_commit('historica', queries, True)
2264 if result is None:
2265 return (False, err)
2266 try:
2267 req = cLabRequest(aPK_obj=result[0][0])
2268 except gmExceptions.ConstructorError, msg:
2269 _log.exception(str(msg), sys.exc_info(), verbose=0)
2270 return (False, msg)
2271 return (True, req)
2272 #------------------------------------------------------------
2273 -def create_lab_result(patient_id=None, when_field=None, when=None, test_type=None, val_num=None, val_alpha=None, unit=None, encounter_id=None, request=None):
2274 tres = None
2275 data = {
2276 'patient_id': patient_id,
2277 'when_field': when_field,
2278 'when': when,
2279 'test_type': test_type,
2280 'val_num': val_num,
2281 'val_alpha': val_alpha,
2282 'unit': unit
2283 }
2284 try:
2285 tres = cLabResult(aPK_obj=data)
2286 # exists already, so fail
2287 _log.error('will not overwrite existing test result')
2288 _log.debug(str(tres))
2289 return (None, tres)
2290 except gmExceptions.NoSuchClinItemError:
2291 _log.debug('test result not found - as expected, will create it')
2292 except gmExceptions.ConstructorError, msg:
2293 _log.exception(str(msg), sys.exc_info(), verbose=0)
2294 return (False, msg)
2295 if request is None:
2296 return (False, _('need lab request when inserting lab result'))
2297 # not found
2298 if encounter_id is None:
2299 encounter_id = request['pk_encounter']
2300 queries = []
2301 cmd = "insert into test_result (fk_encounter, fk_episode, fk_type, val_num, val_alpha, val_unit) values (%s, %s, %s, %s, %s, %s)"
2302 queries.append((cmd, [encounter_id, request['pk_episode'], test_type, val_num, val_alpha, unit]))
2303 cmd = "insert into lnk_result2lab_req (fk_result, fk_request) values ((select currval('test_result_pk_seq')), %s)"
2304 queries.append((cmd, [request['pk_request']]))
2305 cmd = "select currval('test_result_pk_seq')"
2306 queries.append((cmd, []))
2307 # insert new
2308 result, err = gmPG.run_commit('historica', queries, True)
2309 if result is None:
2310 return (False, err)
2311 try:
2312 tres = cLabResult(aPK_obj=result[0][0])
2313 except gmExceptions.ConstructorError, msg:
2314 _log.exception(str(msg), sys.exc_info(), verbose=0)
2315 return (False, msg)
2316 return (True, tres)
2317 #------------------------------------------------------------
2319 # sanity check
2320 if limit < 1:
2321 limit = 1
2322 # retrieve one more row than needed so we know there's more available ;-)
2323 lim = limit + 1
2324 cmd = """
2325 select pk_result
2326 from v_results4lab_req
2327 where reviewed is false
2328 order by pk_patient
2329 limit %s""" % lim
2330 rows = gmPG.run_ro_query('historica', cmd)
2331 if rows is None:
2332 _log.error('error retrieving unreviewed lab results')
2333 return (None, _('error retrieving unreviewed lab results'))
2334 if len(rows) == 0:
2335 return (False, [])
2336 # more than LIMIT rows ?
2337 if len(rows) == lim:
2338 more_avail = True
2339 # but deliver only LIMIT rows so that our assumption holds true...
2340 del rows[limit]
2341 else:
2342 more_avail = False
2343 results = []
2344 for row in rows:
2345 try:
2346 results.append(cLabResult(aPK_obj=row[0]))
2347 except gmExceptions.ConstructorError:
2348 _log.exception('skipping unreviewed lab result [%s]' % row[0], sys.exc_info(), verbose=0)
2349 return (more_avail, results)
2350 #------------------------------------------------------------
2352 lim = limit + 1
2353 cmd = "select pk from lab_request where is_pending is true limit %s" % lim
2354 rows = gmPG.run_ro_query('historica', cmd)
2355 if rows is None:
2356 _log.error('error retrieving pending lab requests')
2357 return (None, None)
2358 if len(rows) == 0:
2359 return (False, [])
2360 results = []
2361 # more than LIMIT rows ?
2362 if len(rows) == lim:
2363 too_many = True
2364 # but deliver only LIMIT rows so that our assumption holds true...
2365 del rows[limit]
2366 else:
2367 too_many = False
2368 requests = []
2369 for row in rows:
2370 try:
2371 requests.append(cLabRequest(aPK_obj=row[0]))
2372 except gmExceptions.ConstructorError:
2373 _log.exception('skipping pending lab request [%s]' % row[0], sys.exc_info(), verbose=0)
2374 return (too_many, requests)
2375 #------------------------------------------------------------
2377 """Get logically next request ID for given lab.
2378
2379 - incrementor_func:
2380 - if not supplied the next ID is guessed
2381 - if supplied it is applied to the most recently used ID
2382 """
2383 if type(lab) == types.IntType:
2384 lab_snippet = 'vlr.fk_test_org=%s'
2385 else:
2386 lab_snippet = 'vlr.lab_name=%s'
2387 lab = str(lab)
2388 cmd = """
2389 select request_id
2390 from lab_request lr0
2391 where lr0.clin_when = (
2392 select max(vlr.sampled_when)
2393 from v_lab_requests vlr
2394 where %s
2395 )""" % lab_snippet
2396 rows = gmPG.run_ro_query('historica', cmd, None, lab)
2397 if rows is None:
2398 _log.warning('error getting most recently used request ID for lab [%s]' % lab)
2399 return ''
2400 if len(rows) == 0:
2401 return ''
2402 most_recent = rows[0][0]
2403 # apply supplied incrementor
2404 if incrementor_func is not None:
2405 try:
2406 next = incrementor_func(most_recent)
2407 except TypeError:
2408 _log.error('cannot call incrementor function [%s]' % str(incrementor_func))
2409 return most_recent
2410 return next
2411 # try to be smart ourselves
2412 for pos in range(len(most_recent)):
2413 header = most_recent[:pos]
2414 trailer = most_recent[pos:]
2415 try:
2416 return '%s%s' % (header, str(int(trailer) + 1))
2417 except ValueError:
2418 header = most_recent[:-1]
2419 trailer = most_recent[-1:]
2420 return '%s%s' % (header, chr(ord(trailer) + 1))
2421 #============================================================
2423 """Calculate BMI.
2424
2425 mass: kg
2426 height: cm
2427 age: not yet used
2428
2429 returns:
2430 (True/False, data)
2431 True: data = (bmi, lower_normal, upper_normal)
2432 False: data = error message
2433 """
2434 converted, mass = gmTools.input2decimal(mass)
2435 if not converted:
2436 return False, u'mass: cannot convert <%s> to Decimal' % mass
2437
2438 converted, height = gmTools.input2decimal(height)
2439 if not converted:
2440 return False, u'height: cannot convert <%s> to Decimal' % height
2441
2442 approx_surface = (height / decimal.Decimal(100))**2
2443 bmi = mass / approx_surface
2444
2445 print mass, height, '->', approx_surface, '->', bmi
2446
2447 lower_normal_mass = 20.0 * approx_surface
2448 upper_normal_mass = 25.0 * approx_surface
2449
2450 return True, (bmi, lower_normal_mass, upper_normal_mass)
2451 #============================================================
2452 # main - unit testing
2453 #------------------------------------------------------------
2454 if __name__ == '__main__':
2455
2456 if len(sys.argv) < 2:
2457 sys.exit()
2458
2459 if sys.argv[1] != 'test':
2460 sys.exit()
2461
2462 import time
2463
2464 gmI18N.activate_locale()
2465 gmI18N.install_domain()
2466
2467 #------------------------------------------
2469 tr = create_test_result (
2470 encounter = 1,
2471 episode = 1,
2472 type = 1,
2473 intended_reviewer = 1,
2474 val_num = '12',
2475 val_alpha=None,
2476 unit = 'mg/dl'
2477 )
2478 print tr
2479 return tr
2480 #------------------------------------------
2484 #------------------------------------------
2486 r = cTestResult(aPK_obj=1)
2487 print r
2488 #print r.reference_ranges
2489 print r.formatted_range
2490 print r.temporally_closest_normal_range
2491 #------------------------------------------
2493 print "test_result()"
2494 # lab_result = cLabResult(aPK_obj=4)
2495 data = {
2496 'patient_id': 12,
2497 'when_field': 'val_when',
2498 'when': '2000-09-17 18:23:00+02',
2499 'test_type': 9,
2500 'val_num': 17.3,
2501 'val_alpha': None,
2502 'unit': 'mg/l'
2503 }
2504 lab_result = cLabResult(aPK_obj=data)
2505 print lab_result
2506 fields = lab_result.get_fields()
2507 for field in fields:
2508 print field, ':', lab_result[field]
2509 print "updatable:", lab_result.get_updatable_fields()
2510 print time.time()
2511 print lab_result.get_patient()
2512 print time.time()
2513 #------------------------------------------
2515 print "test_request()"
2516 try:
2517 # lab_req = cLabRequest(aPK_obj=1)
2518 # lab_req = cLabRequest(req_id='EML#SC937-0176-CEC#11', lab=2)
2519 data = {
2520 'req_id': 'EML#SC937-0176-CEC#11',
2521 'lab': 'Enterprise Main Lab'
2522 }
2523 lab_req = cLabRequest(aPK_obj=data)
2524 except gmExceptions.ConstructorError, msg:
2525 print "no such lab request:", msg
2526 return
2527 print lab_req
2528 fields = lab_req.get_fields()
2529 for field in fields:
2530 print field, ':', lab_req[field]
2531 print "updatable:", lab_req.get_updatable_fields()
2532 print time.time()
2533 print lab_req.get_patient()
2534 print time.time()
2535 #--------------------------------------------------------
2540 #--------------------------------------------------------
2545 #--------------------------------------------------------
2547 print create_measurement_type (
2548 lab = None,
2549 abbrev = u'tBZ2',
2550 unit = u'mg%',
2551 name = 'BZ (test 2)'
2552 )
2553 #--------------------------------------------------------
2558 #--------------------------------------------------------
2563 #--------------------------------------------------------
2565 results = [
2566 cTestResult(aPK_obj=1),
2567 cTestResult(aPK_obj=2),
2568 cTestResult(aPK_obj=3)
2569 # cTestResult(aPK_obj=4)
2570 ]
2571 print format_test_results(results = results)
2572 #--------------------------------------------------------
2574 done, data = calculate_bmi(mass = sys.argv[2], height = sys.argv[3])
2575 bmi, low, high = data
2576
2577 print "BMI:", bmi
2578 print "low:", low, "kg"
2579 print "hi :", high, "kg"
2580 #--------------------------------------------------------
2585 #--------------------------------------------------------
2586
2587 test_result()
2588 #test_create_test_result()
2589 #test_delete_test_result()
2590 #test_create_measurement_type()
2591 #test_lab_result()
2592 #test_request()
2593 #test_create_result()
2594 #test_unreviewed()
2595 #test_pending()
2596 #test_meta_test_type()
2597 #test_test_type()
2598 #test_format_test_results()
2599 #test_calculate_bmi()
2600 #test_test_panel()
2601
2602 #============================================================
2603
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Sat Aug 3 03:57:01 2013 | http://epydoc.sourceforge.net |