mirror of
https://github.com/Astatin3/optical-flow-outliar.git
synced 2026-06-08 16:18:09 -06:00
Some Stuff
This commit is contained in:
@@ -0,0 +1,96 @@
|
|||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
def process_video(video_path, scale_factor=0.5, min_area=500):
|
||||||
|
"""
|
||||||
|
Process video for motion detection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
video_path: Path to input video
|
||||||
|
output_path: Path to save processed video
|
||||||
|
scale_factor: Factor to downscale the frames
|
||||||
|
min_area: Minimum contour area to be considered as motion
|
||||||
|
"""
|
||||||
|
# Open video
|
||||||
|
cap = cv2.VideoCapture(video_path)
|
||||||
|
if not cap.isOpened():
|
||||||
|
raise ValueError("Error opening video file")
|
||||||
|
|
||||||
|
# Get video properties
|
||||||
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
|
fps = int(cap.get(cv2.CAP_PROP_FPS))
|
||||||
|
|
||||||
|
# Create video writer
|
||||||
|
# fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||||||
|
# out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
|
||||||
|
|
||||||
|
# Read first frame
|
||||||
|
ret, prev_frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
raise ValueError("Error reading first frame")
|
||||||
|
|
||||||
|
# Process first frame
|
||||||
|
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||||
|
prev_small = cv2.resize(prev_gray, None, fx=scale_factor, fy=scale_factor)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Read current frame
|
||||||
|
ret, curr_frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Convert to grayscale
|
||||||
|
curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
|
||||||
|
|
||||||
|
# Downscale
|
||||||
|
curr_small = cv2.resize(curr_gray, None, fx=scale_factor, fy=scale_factor)
|
||||||
|
|
||||||
|
# Calculate absolute difference
|
||||||
|
frame_diff = cv2.absdiff(curr_small, prev_small)
|
||||||
|
|
||||||
|
# Apply threshold to difference
|
||||||
|
_, thresh = cv2.threshold(frame_diff, 50, 255, cv2.THRESH_BINARY)
|
||||||
|
|
||||||
|
# Dilate to fill in holes
|
||||||
|
kernel = np.ones((3,3), np.uint8)
|
||||||
|
dilated = cv2.dilate(thresh, kernel, iterations=2)
|
||||||
|
|
||||||
|
# Scale back up to original size
|
||||||
|
motion_mask = cv2.resize(dilated, (frame_width, frame_height))
|
||||||
|
|
||||||
|
# Find contours
|
||||||
|
contours, _ = cv2.findContours(motion_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||||
|
|
||||||
|
# Draw motion areas on original frame
|
||||||
|
for contour in contours:
|
||||||
|
if cv2.contourArea(contour) > min_area:
|
||||||
|
x, y, w, h = cv2.boundingRect(contour)
|
||||||
|
cv2.rectangle(curr_frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
||||||
|
|
||||||
|
# Write frame to output video
|
||||||
|
cv2.imshow("e",curr_frame)
|
||||||
|
|
||||||
|
|
||||||
|
# Exit if 'q' is pressed
|
||||||
|
if cv2.waitKey(30) & 0xFF == ord('q'):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Update previous frame
|
||||||
|
prev_small = curr_small
|
||||||
|
|
||||||
|
# Release resources
|
||||||
|
cap.release()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# Example usage
|
||||||
|
input_video = 0
|
||||||
|
try:
|
||||||
|
process_video(input_video)
|
||||||
|
print("Motion detection completed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing video: {str(e)}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
+143
@@ -0,0 +1,143 @@
|
|||||||
|
import numpy as np
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
def detect_feature_points(frame, max_corners=1000):
|
||||||
|
"""
|
||||||
|
Detect good features to track in the frame.
|
||||||
|
"""
|
||||||
|
# Convert frame to grayscale if it's not already
|
||||||
|
if len(frame.shape) == 3:
|
||||||
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||||
|
else:
|
||||||
|
gray = frame
|
||||||
|
|
||||||
|
# Detect corners using Shi-Tomasi method
|
||||||
|
corners = cv2.goodFeaturesToTrack(
|
||||||
|
gray,
|
||||||
|
maxCorners=max_corners,
|
||||||
|
qualityLevel=0.001,
|
||||||
|
minDistance=10,
|
||||||
|
blockSize=7
|
||||||
|
)
|
||||||
|
|
||||||
|
return corners
|
||||||
|
|
||||||
|
def calculate_optical_flow(prev_frame, curr_frame, prev_points):
|
||||||
|
"""
|
||||||
|
Calculate optical flow for given points between two frames.
|
||||||
|
"""
|
||||||
|
# Convert frames to grayscale
|
||||||
|
if len(prev_frame.shape) == 3:
|
||||||
|
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||||||
|
curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
|
||||||
|
else:
|
||||||
|
prev_gray = prev_frame
|
||||||
|
curr_gray = curr_frame
|
||||||
|
|
||||||
|
# Calculate optical flow using Lucas-Kanade method
|
||||||
|
curr_points, status, error = cv2.calcOpticalFlowPyrLK(
|
||||||
|
prev_gray,
|
||||||
|
curr_gray,
|
||||||
|
prev_points,
|
||||||
|
None,
|
||||||
|
winSize=(15, 15),
|
||||||
|
maxLevel=2,
|
||||||
|
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Filter out points where flow wasn't found
|
||||||
|
good_new = curr_points[status == 1]
|
||||||
|
good_old = prev_points[status == 1]
|
||||||
|
|
||||||
|
return good_new, good_old
|
||||||
|
|
||||||
|
def estimate_camera_motion(prev_points, curr_points, threshold=5.0):
|
||||||
|
"""
|
||||||
|
Estimate camera motion and identify outlier points.
|
||||||
|
Returns mask of points that don't follow the dominant motion pattern.
|
||||||
|
"""
|
||||||
|
# Calculate motion vectors
|
||||||
|
motion_vectors = curr_points - prev_points
|
||||||
|
|
||||||
|
# Calculate median motion as an estimate of camera motion
|
||||||
|
median_motion = np.median(motion_vectors, axis=0)
|
||||||
|
|
||||||
|
# Calculate the difference from median motion for each point
|
||||||
|
motion_differences = np.linalg.norm(motion_vectors - median_motion, axis=1)
|
||||||
|
|
||||||
|
# Calculate the median absolute deviation (MAD)
|
||||||
|
mad = np.median(np.abs(motion_differences - np.median(motion_differences)))
|
||||||
|
|
||||||
|
# Points with motion significantly different from the camera motion
|
||||||
|
# are considered outliers (using modified z-score)
|
||||||
|
outliers_mask = motion_differences > (threshold * mad)
|
||||||
|
|
||||||
|
return outliers_mask
|
||||||
|
|
||||||
|
def analyze_motion(video_path):
|
||||||
|
"""
|
||||||
|
Analyze motion in video and detect objects moving differently from camera motion.
|
||||||
|
"""
|
||||||
|
cap = cv2.VideoCapture(video_path)
|
||||||
|
|
||||||
|
# Read first frame
|
||||||
|
ret, prev_frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
raise ValueError("Could not read video")
|
||||||
|
|
||||||
|
prev_points = None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
ret, curr_frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
# Detect initial points
|
||||||
|
if prev_points is None:
|
||||||
|
prev_points = detect_feature_points(prev_frame)
|
||||||
|
elif curr_points.shape[0] < 600:
|
||||||
|
prev_points = detect_feature_points(prev_frame)
|
||||||
|
|
||||||
|
|
||||||
|
print(prev_points.shape)
|
||||||
|
|
||||||
|
# Calculate optical flow
|
||||||
|
curr_points, prev_points_matched = calculate_optical_flow(
|
||||||
|
prev_frame, curr_frame, prev_points
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(curr_points) > 0 and len(prev_points_matched) > 0:
|
||||||
|
# Find points not moving with camera
|
||||||
|
outliers_mask = estimate_camera_motion(prev_points_matched, curr_points)
|
||||||
|
|
||||||
|
# Visualize results
|
||||||
|
frame_vis = curr_frame.copy()
|
||||||
|
|
||||||
|
# Draw all tracked points
|
||||||
|
for i, (new, old) in enumerate(zip(curr_points, prev_points_matched)):
|
||||||
|
a, b = new.ravel()
|
||||||
|
c, d = old.ravel()
|
||||||
|
|
||||||
|
# Draw line between old and new position
|
||||||
|
color = (0, 0, 255) if outliers_mask[i] else (0, 255, 0)
|
||||||
|
cv2.line(frame_vis, (int(c), int(d)), (int(a), int(b)), color, 2)
|
||||||
|
cv2.circle(frame_vis, (int(a), int(b)), 3, color, -1)
|
||||||
|
|
||||||
|
cv2.imshow('Frame', frame_vis)
|
||||||
|
|
||||||
|
# Exit if 'q' is pressed
|
||||||
|
if cv2.waitKey(30) & 0xFF == ord('q'):
|
||||||
|
break
|
||||||
|
|
||||||
|
# Update for next iteration
|
||||||
|
prev_frame = curr_frame.copy()
|
||||||
|
prev_points = curr_points.reshape(-1, 1, 2)
|
||||||
|
|
||||||
|
cap.release()
|
||||||
|
cv2.destroyAllWindows()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Example usage
|
||||||
|
video_path = 0
|
||||||
|
analyze_motion(video_path)
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
import cv2
|
||||||
|
|
||||||
|
cap = cv2.VideoCapture(0)
|
||||||
|
while True:
|
||||||
|
ret, frame = cap.read()
|
||||||
|
with open('/dev/fb0', 'rb+') as buf:
|
||||||
|
buf.write(frame)
|
||||||
|
cap.release()
|
||||||
Reference in New Issue
Block a user