forked from JostTim/pImage
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwriters.py
269 lines (211 loc) · 8.93 KB
/
writers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# -*- coding: utf-8 -*-
"""Boilerplate:
A one line summary of the module or program, terminated by a period.
Rest of the description. Multiliner
<div id = "exclude_from_mkds">
Excluded doc
</div>
<div id = "content_index">
<div id = "contributors">
Created on Tue Oct 12 18:54:37 2021
@author: Timothe
</div>
"""
import os, warnings
import numpy as np
from cv2 import VideoWriter, VideoWriter_fourcc
def select_extension_writer(file_path):
if os.path.splitext(file_path)[1] == ".avi" :
return AviWriter
if os.path.splitext(file_path)[1] == ".mp4" or os.path.splitext(file_path)[1] == ".m4v" :
return MP4Writer
if os.path.splitext(file_path)[1] == ".mkv" :
return MKVWriter
if os.path.splitext(file_path)[1] == ".tiff" :
return TiffWriter
if os.path.splitext(file_path)[1] == ".gif" :
return GifWriter
else :
raise NotImplementedError("File extension/CODEC not supported yet")
class AutoVideoWriter:
def __new__(cls,path,**kwargs):
selected_writer_class = select_extension_writer(path)
return selected_writer_class(path,**kwargs)
class DefaultWriter:
############## Methods that needs to be overriden :
def __init__(self,**kwargs):
pass
def _write_frame(self,array):
raise NotImplementedError
#implement the actual writing of one frame to the file output
############## Methods to overrride if necessary :
def open(self):
pass
def close(self):
pass
############## Methods to keep :
def __enter__(self):
self.open()
return self
def __exit__(self, type, value, traceback):
#Exception handling here
self.close()
def write_from(self,frame_yielder):
"""
writes to disk all available frames from a yielder.
The yielder can be anything providing valid image data, (in a consistant manner from the first frame to the last the writer recieved)
It's intended to be used specifically with a reader (reader = pImage.AutoVideoReader(videopath)) and method reader.frames() (in this case it gives all the frames available)
or reader.sequence(start,stop) (in this case it gives frames between frame 'start' and frame 'stop' supplied as integers)
"""
def activity_bar(l_index):#just a subclass to handle an activitybar made of small dots "moving", to see if the process crashes.
nonlocal msg
if l_index%10 == 0:
if len(msg)> 6:
print(' ',end='\r')
msg = ''
else :
msg += '.'
print(msg,end='\r')
msg = ''
print("Writing")
for index, frame in enumerate(frame_yielder) :
activity_bar(index)
self.write(frame)
print("Writing finished")
def write(self,array):
self._write_frame(array)
class OpenCVWriter(DefaultWriter):
def __init__(self,path,**kwargs):
"""
Creates an object that contains all parameters to create a video,
as specified with kwargs listed below.
The first time object.addFrame is called, the video is actually opened,
and arrays are written to it as frames.
When the video is written, one can call self.close() to release
python handle over the file or it is implicity called if used in structure :
```with frames_ToAVI(params) as object``` wich by the way is advised for better
stability.
Parameters
----------
path : TYPE
DESCRIPTION.
**kwargs : TYPE
- fps :
playspeed of video
- codec :
4 char string representing an existing CODEC installed on your machine
"MJPG" is default and works great for .avi files
List of FOURCC codecs :
https://www.fourcc.org/codecs.php
- dtype :
- rgbconv :
root
filename
Returns
-------
None.
"""
filename = kwargs.get("filename",None)
root = kwargs.get("root",None)
if root is not None :
path = os.path.join(root,path)
if filename is not None :
path = os.path.join(path,filename)
path = os.path.abspath(path)
if not os.path.isdir(os.path.split(path)[0]):
os.makedirs(os.path.split(path)[0])
self.path = path
self.rgbconv = kwargs.get("rgbconv","RGB2BGR")
# /!\ : if data from matplotlib exported arrays : color layers are not right
#necessary to add parameter rgbconv = "RGB2BGR"
self.fps = kwargs.get("fps", 30)
self.codec = kwargs.get("codec", "MJPG")
self.dtype = kwargs.get("dtype", 'uint8')
self.fourcc = VideoWriter_fourcc(*self.codec)
self.file_handle = None
def _write_frame(self,array):
from cv2 import cvtColor, COLOR_RGB2BGR, COLOR_HSV2BGR, COLOR_BGRA2BGR, COLOR_RGBA2BGR
if self.file_handle is None :
self.size = np.shape(array)[1], np.shape(array)[0]
self.file_handle = VideoWriter(self.path, self.fourcc, self.fps, self.size, True)#color is always True because...
frame = array.astype(self.dtype)
if len(frame.shape) < 3 :
frame = np.repeat(frame[:,:,np.newaxis],3,axis = 2)#...I just duplicate 3 times gray data if it isn't
elif self.rgbconv is not None :
frame = eval( f"cvtColor(frame, COLOR_{self.rgbconv})" )
self.file_handle.write(frame)
def close(self):
if self.file_handle is None :
warnings.warn("No data has been given, video was not created")
return
self.file_handle.release()
class AviWriter(OpenCVWriter):
"""
Has the huge advantage of being able to terminate file almost propery if not closed.
Also has a low quality reduction. Results in quite big videos compared to the other codecs.
Reduction is still orgers of magnitude better than uncompressed data, as we usually record "static background" videos on our setups.
"""
def __init__(self,path,**kwargs):
super().__init__(path,**kwargs)
self.codec = kwargs.get("codec", "MJPG")
self.fourcc = VideoWriter_fourcc(*self.codec)
class MP4Writer(OpenCVWriter):
def __init__(self,path,**kwargs):
super().__init__(path,**kwargs)
self.codec = kwargs.get("codec", "mp4v")
self.fourcc = VideoWriter_fourcc(*self.codec)
class MKVWriter(OpenCVWriter):
"""
writer based on the DIVX codec. From the selection of this lib, one of the most disk-space efficient way to store videos
But results in a loss of quality. Use for presentations with light videos and such. Avoid using for further data analysis.
"""
def __init__(self,path,**kwargs):
super().__init__(path,**kwargs)
self.codec = kwargs.get("codec", "DIVX")
self.fourcc = VideoWriter_fourcc(*self.codec)
class TiffWriter(DefaultWriter):
try :
from libtiff import TIFF as tiff_writer
except ImportError as e:
tiff_writer = e
def __init__(self,path,**kwargs):
self.path = os.path.dirname(path)
self.file_prefix = os.path.splitext(os.path.basename(path))[0]
self.index = 0
def _make_full_fullpath(self,index):
if not os.path.isdir(self.path) :
os.makedirs(self.path)
return os.path.join(self.path,self.file_prefix + f"_{str(index).zfill(5)}.tiff")
def _write_frame(self,array):
_fullpath = self._make_full_fullpath(self.index)
tiff_writer = self.tiff_writer.open(_fullpath, mode = "w")
tiff_writer.write_image(array)
tiff_writer.close()
self.index += 1
class GifWriter(DefaultWriter):
def __init__(self,path,**kwargs):
self.path = path
self.frame_bank = []
self.seamless = kwargs.get("seamless",False)
self.frameduration = 1000/kwargs.get("framerate",33.3)
self.optimize = kwargs.get("optimize",True)
self.infinite = kwargs.get("infinite",True)
def _write_frame(self,array):
from PIL import Image as pillow_image
self.frame_bank.append(pillow_image.fromarray(array))
def flush(self):
start = 0
stop = len(self.frame_bank)
img0 = self.frame_bank[start]
if self.seamless :
rest_of_imgs = self.frame_bank[start+1:stop]+self.frame_bank[stop:start:-1]
else :
rest_of_imgs = self.frame_bank[start+1:stop]
img0.save(self.path,
save_all = True,
append_images=rest_of_imgs,
duration=self.frameduration,
loop=self.infinite,
optimize = self.optimize)
def close(self):
self.flush()