-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpyCvExtractAvgColor.py
More file actions
205 lines (173 loc) · 6.58 KB
/
pyCvExtractAvgColor.py
File metadata and controls
205 lines (173 loc) · 6.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import argparse
import cv2
from matplotlib import pylab as plt
import numpy as np
import pandas as pd
from pathlib import PurePath
from queue import Queue
import sys
from threading import Thread
import time
import tqdm
import os
class FileVideoStream:
def __init__(self, path, fps=1, queueSize=128):
# initialize the file video stream along with the boolean
# used to indicate if the thread should be stopped or not
self.stream = cv2.VideoCapture(path)
self.stopped = False
self.fps = fps
# initialize the queue used to store frames read from
# the video file
self.Q = Queue(maxsize=queueSize)
def start(self):
# start a thread to read frames from the file video stream
t = Thread(target=self.update, args=())
t.daemon = True
t.start()
return self
def update(self):
# keep looping infinitely
while True:
# if the thread indicator variable is set, stop the
# thread
if self.stopped:
return
# otherwise, ensure the queue has room in it
if not self.Q.full():
# read the next frame from the file
for _ in range(self.fps):
(grabbed, frame) = self.stream.read()
# if the `grabbed` boolean is `False`, then we have
# reached the end of the video file
if not grabbed:
self.stop()
return
# add the frame to the queue
self.Q.put(frame)
def read(self):
# return next frame in the queue
return self.Q.get()
def more(self):
# return True if there are still frames in the queue
return self.Q.qsize() > 0
def stop(self):
# indicate that the thread should be stopped
self.stopped = True
def plot_result(csv_path, fps):
# save a result plot of the colors
df = pd.read_csv(csv_path)
t = np.linspace(0, len(df.r)/fps, len(df.r))
plt.plot(t, df.r, 'r', label='red')
plt.plot(t, df.g, 'g', label='green')
plt.plot(t, df.b, 'b', label='blue')
plt.xlabel('time [s]')
plt.legend()
plt.savefig(f'{csv_path[:-4]}.png')
plt.close()
if __name__ == "__main__":
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video", type=str, required=True,
help="Path to input video file")
ap.add_argument("-a", "--area", type=str, default="",
help="Area (x1, y1, x2, y2) of the ROI in the video")
ap.add_argument("-s", "--speed", type=str, default="1",
help="Manual video frame search speed")
ap.add_argument("-o", "--out", type=str, required=False,
help="Output path")
ap.add_argument("-tr", "--tracker", type=bool, required=False, default=False
help="Output path")
args = vars(ap.parse_args())
if (args.tracker):
tracker = cv2.TrackerKCF_create()
# initialize the bounding box coordinates
if len(args.get('area')) < 7:
initBB = None
else:
area = args.get('area').split(',')
if len(area) != 4:
print("area needs to be in form: x1,y1,x2,y2")
sys.exit(-1)
else:
initBB = (int(area[0]), int(area[1]),
int(area[2]) - int(area[0]), int(area[3]) - int(area[1]))
# check if we know the area, if not we need to show the video
if isinstance(initBB, type(None)):
setArea = False
else:
# if we know the area, we can process the video
setArea = True
fps = int(args.get('speed'))
filestem = PurePath(args.get('video')).stem
if (args.get('out')):
csv_path = os.path.join(f'{args["out"]}', f'colors_{filestem}.csv')
else:
csv_path = f'colors_{filestem}.csv'
# if a video path was not supplied, grab the reference to the web cam
fvs = FileVideoStream(args["video"], fps).start()
time.sleep(1.0)
vfps = round(fvs.stream.get(cv2.CAP_PROP_FPS))
num_frames = int(fvs.stream.get(cv2.CAP_PROP_FRAME_COUNT))
if not setArea:
# loop over frames from the video stream
while not setArea:
if fvs.more():
frame = fvs.read()
# check to the led area is known
if initBB is not None:
[x, y, w, h] = initBB
cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 200, 0), 2)
# show the output frame
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# press 's' key, to "select" a bounding box
if key == ord("s"):
# select the bounding box of the object we want to track (make
# sure you press ENTER or SPACE after selecting the ROI)
initBB = cv2.selectROI("Frame", frame, fromCenter=False,
showCrosshair=True)
break
# if the `q` key was pressed, break from the loop
elif key == ord("q"):
break
print(f'Selected area: "{initBB[0]},{initBB[1]},\
{initBB[0]+initBB[2]},{initBB[1]+initBB[3]}"')
# close all windows
cv2.destroyAllWindows()
# know we knoe the area, so let's process the video for the trigger
if not setArea:
# stop video read thread
fvs.stop()
# (re)open the video stream from the beginning
vs = cv2.VideoCapture(args["video"])
time.sleep(1.0)
s = time.time()
T = 1/vfps
with open(csv_path, "w") as out:
out.write('time,b,g,r\n')
for k in tqdm.tqdm(range(num_frames)):
ret, frame = vs.read()
if ret:
if (k==0):
if (args.tracker):
tracker.init(frame, initBB)
success = True
box = initBB
else:
if (args.tracker):
(success, box) = tracker.update(frame)
else:
success = True
box = initBB
if (success):
roi = frame[box[1]:(box[1]+box[3]),
box[0]:(box[0]+box[2]), :]
avg_color = roi.mean(axis=0).mean(axis=0)
out.write(f'{k*T},{avg_color[0]},{avg_color[1]},{avg_color[2]}\n')
e = time.time()
print(f'started: {s}\nended : {e}')
# stop video read thread
vs.release()
plot_result(csv_path, vfps)
print('Done\n')