Sử dụng Background Subtraction phát hiện vật chuyển động

10/26/2021 computer vision

Bài này sẽ hướng dẫn cách tạo ứng dụng để phát hiện bất thường của camera, thông báo cảnh báo đó tới cho người dùng thông qua slack (hoặc email và line cũng có thể làm tưởng tự).

# Thuật toán

Thuật toán sẽ như sau:

  • Chuyển đồi frame về gray image
  • Lấy sai khác của 2 frame liên tiếp
  • Chuyển sai khác đó về dạng binary
  • Định lượng sai khác bằng cách đếm số pixel trắng trên ảnh binary
  • So sánh sai khác đó với ngưỡng mà user thiết lập
  • Nếu vượt quá ngưỡng thì thông báo và gửi ảnh bằng chứng đến slack

# Cách lấy token của Slack API

  • Đầu tiên cần mặc định rằng chúng ta có quyền cài đặt app trong slack workspace. Chúng ta chọn Workspace settings

workspace_settings

  • Sau đó chọn đến phần Configure apps

confiure_apps

  • Sau đó chọn Custom integrations

confiure_integration

  • Trong cửa sổ tìm kiếm gõ bots để tìm app

bots

  • Tích hợp bots app vào trong workspace

integrations

  • Copy token API

get_token

# Cách lấy channelID

Để lấy channelID chỉ cần vào channel đó trên browser như google chrome hay firefox, sau đó tìm đến chuỗi kỹ tự cuối cùng trong URL, đó chính là channel ID

  • Ví dụ như: https://app.slack.com/client/XXXXXXXXXXX/C02KL7D9GN5 thì C02KL7D9GN5 chính là channelID

# Background Subtraction

import io
import cv2
import requests
from time import sleep

# slack notify setting
SLACK_URL = 'https://slack.com/api/files.upload'
# @TODO change this value
CHANNEL_ID = 'C02KL7D9GN5'
# @TODO change this value
SLACK_TOKEN = 'xoxb-2644784446804-2643559509221-HTx9IPn3SuBNUTEoQvvZOpxx'


def image_diff(before_image_gray, current_image_gray):
    diff = cv2.absdiff(before_image_gray, current_image_gray)
    _, diff_binary = cv2.threshold(diff, 50, 255, cv2.THRESH_BINARY)
    sum_diff = cv2.countNonZero(diff_binary)
    thresh = before_image_gray.shape[0] * before_image_gray.shape[1] * 0.1
    print('image_shape: {}'.format(before_image_gray.shape))
    print('sum_diff: {}'.format(sum_diff))
    print('thresh: {}'.format(thresh))
    if sum_diff < thresh:
        return False
    return True


def Interval():
    cap = cv2.VideoCapture(0)
    before_image_origin = None
    before_image_gray = None
    while True:
        try:
            _, current_image_origin = cap.read()
            if current_image_origin is None:
                sleep(1)
                continue
            # calculate a differrence between a previous image and a current image
            save_im = False
            current_image_gray = cv2.cvtColor(current_image_origin, cv2.COLOR_BGR2GRAY)
            if before_image_gray is None:
                before_image_gray = current_image_gray
                before_image_origin = current_image_origin
            else:
                save_im = image_diff(before_image_gray, current_image_gray)
                print(save_im)
                if save_im:
                    cv2.putText(before_image_origin,
                                text='before',
                                org=(20, 50),
                                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                fontScale=1.0,
                                color=(0, 255, 0),
                                thickness=2,
                                lineType=cv2.LINE_4)
                    cv2.putText(current_image_origin,
                                text='after',
                                org=(20, 50),
                                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                fontScale=1.0,
                                color=(0, 255, 0),
                                thickness=2,
                                lineType=cv2.LINE_4)
                    hconcat = cv2.hconcat([before_image_origin, current_image_origin])

                    # slack notify
                    if SLACK_TOKEN and CHANNEL_ID:
                        try:
                            encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 100]
                            _, encoded_image = cv2.imencode('.jpeg', hconcat, encode_param)

                            files = {
                                'file': io.BytesIO(encoded_image)
                            }
                            param = {
                                'token': SLACK_TOKEN,
                                'channels': CHANNEL_ID,
                                'filename': "detected.jpeg",
                                'initial_comment': "Detected!",
                                'title': "image"
                            }

                            response = requests.post(SLACK_URL, params=param, files=files)
                            print(response)
                            sleep(60)
                        except Exception as e:
                            print('Could not send image to Slack: {}'.format(e))

                before_image_gray = current_image_gray
                before_image_origin = current_image_origin

            cv2.imshow("Frame", current_image_origin)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            sleep(1)
        except Exception as e:
            print(e)
            sleep(60)


def main():
    Interval()


if __name__ == '__main__':
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

# Kết quả

Result

Last Updated: 11/3/2021, 1:05:03 PM