daemon.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. """
  3. import argparse
  4. import logging
  5. import os
  6. import subprocess
  7. import sys
  8. from datetime import datetime as dt
  9. from evdev import InputDevice, ecodes
  10. from rfidacd import __version__
  11. __author__ = "Thomas Verchow"
  12. __copyright__ = "Thomas Verchow"
  13. __license__ = "MIT"
  14. _logger = logging.getLogger(__name__)
  15. SHELL = os.getenv("SHELL", "/bin/sh")
  16. def parse_args(args):
  17. """
  18. """
  19. def dir_path(path: str):
  20. if os.path.isdir(path):
  21. return path
  22. else:
  23. parser.error(f"{path} is not a readable directory")
  24. parser = argparse.ArgumentParser(description="RFID action commander daemon")
  25. parser.add_argument(
  26. "--version", action="version", version="rfidacd {ver}".format(ver=__version__),
  27. )
  28. parser.add_argument(
  29. "-d", "--device", help="device to listen", type=argparse.FileType("r")
  30. )
  31. parser.add_argument(
  32. "-p", "--path", default=".", help="path to action scripts", type=dir_path
  33. )
  34. parser.add_argument(
  35. "-s", "--seconds", default=5, help="seconds between same rfid", type=int
  36. )
  37. parser.add_argument(
  38. "-v",
  39. "--verbose",
  40. dest="loglevel",
  41. help="set loglevel to INFO",
  42. action="store_const",
  43. const=logging.INFO,
  44. )
  45. parser.add_argument(
  46. "-vv",
  47. "--very-verbose",
  48. dest="loglevel",
  49. help="set loglevel to DEBUG",
  50. action="store_const",
  51. const=logging.DEBUG,
  52. )
  53. return parser.parse_args(args)
  54. def read_rfid(device):
  55. combined_string = ""
  56. for event in InputDevice(device.name).read_loop():
  57. if event.type == ecodes.EV_KEY and event.value == 0: # value 0 = release key
  58. if event.code == 28: # code 28 = KEY_ENTER
  59. return combined_string
  60. # [4:5]? .. KEY[] starts with 'KEY_' and we expect one char
  61. combined_string += ecodes.KEY[event.code][4:5]
  62. def run_action(script: str):
  63. _logger.info(f"RFID action: run '{SHELL} {script}'")
  64. try:
  65. r = subprocess.call([SHELL, script])
  66. except Exception as err:
  67. _logger.info(err)
  68. last_rfid = None
  69. def main(args):
  70. args = parse_args(args)
  71. logging.basicConfig(level=args.loglevel)
  72. _logger.debug(f"Device to read from: {args.device}")
  73. _logger.debug(f"Shell to execute commands: {SHELL}")
  74. _logger.debug(f"Expect scripts to run in {args.path}")
  75. last_rfid = None
  76. last_ts = dt.now()
  77. while True:
  78. rfid = read_rfid(args.device)
  79. seconds_gone = (dt.now() - last_ts).total_seconds()
  80. script = os.path.join(args.path, rfid)
  81. if os.path.islink(script): # special RFID - always run!
  82. _logger.info(f"Action script {script} is special/link.")
  83. run_action(script)
  84. continue
  85. if not os.path.isfile(script):
  86. _logger.info(f"Action script {script} not found.")
  87. continue
  88. if rfid == last_rfid and seconds_gone <= args.seconds:
  89. _logger.info(f"Same RFID after {seconds_gone:.1f} seconds. Do nothing.")
  90. continue
  91. last_ts = dt.now()
  92. last_rfid = rfid
  93. run_action(script)
  94. def run():
  95. main(sys.argv[1:])
  96. if __name__ == "__main__":
  97. run()