-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwhat.py
executable file
·173 lines (132 loc) · 6.15 KB
/
what.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: Not titled yet
# GNU Radio version: 3.10.4.0
from gnuradio import analog
import math
from gnuradio import blocks
from gnuradio import digital
from gnuradio import filter
from gnuradio.filter import firdes
from gnuradio import gr
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from gnuradio import soapy
import what_epy_block_0 as epy_block_0 # embedded python block
recordingToFile = 0
class what(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "Not titled yet", catch_exceptions=True)
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 2000000
self.update_rate = update_rate = 0.00001
self.num_pts = num_pts = int(samp_rate * 0.2)
self.fsk_deviation_hz = fsk_deviation_hz = 20000
self.fname = fname = "firstrealpls"
self.file_name = file_name = "temp_write"
import time
file_name = "recordings/"+str(time.strftime("%Y-%m-%d-%H-%M-%S.raw", time.localtime()))
##################################################
# Blocks
##################################################
self.soapy_rtlsdr_source_0 = None
dev = 'driver=rtlsdr'
stream_args = ''
tune_args = ['']
settings = ['']
try:
self.soapy_rtlsdr_source_0 = soapy.source(dev, "fc32", 1, '', stream_args, tune_args, settings)
except RuntimeError as e:
print(f"plug in your sdr! [{str(e)}]")
quit()
self.soapy_rtlsdr_source_0.set_sample_rate(0, samp_rate)
self.soapy_rtlsdr_source_0.set_gain_mode(0, False)
self.soapy_rtlsdr_source_0.set_frequency(0, 450003000)
self.soapy_rtlsdr_source_0.set_frequency_correction(0, 0)
self.soapy_rtlsdr_source_0.set_gain(0, 'TUNER', 30)
self.freq_xlating_fir_filter_xxx_0 = filter.freq_xlating_fir_filter_ccc(1, firdes.low_pass(1.0,samp_rate/4, 37500, 5000), 0, samp_rate/4)
self.epy_block_0 = epy_block_0.blk()
self.digital_binary_slicer_fb_0 = digital.binary_slicer_fb()
self.blocks_null_sink_0 = blocks.null_sink(gr.sizeof_char*1)
self.blocks_multiply_const_xx_0 = blocks.multiply_const_ff(100, 1)
self.blocks_moving_average_xx_0 = blocks.moving_average_ff(10000, 1, 10000, 1)
self.freq_xlating_fir_filter_xxx_1 = filter.freq_xlating_fir_filter_ccc(4, firdes.low_pass(1.0,samp_rate, 300000, 100000), 0, samp_rate)
global recordingToFile
if recordingToFile:
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_gr_complex*1, file_name, False)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1)
self.analog_quadrature_demod_cf_0 = analog.quadrature_demod_cf((3*(samp_rate/(2*math.pi*fsk_deviation_hz))))
##################################################
# Connections
##################################################
self.connect((self.analog_quadrature_demod_cf_0, 0), (self.digital_binary_slicer_fb_0, 0))
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_multiply_const_xx_0, 0))
self.connect((self.blocks_moving_average_xx_0, 0), (self.epy_block_0, 1))
self.connect((self.blocks_multiply_const_xx_0, 0), (self.blocks_moving_average_xx_0, 0))
self.connect((self.digital_binary_slicer_fb_0, 0), (self.epy_block_0, 0))
self.connect((self.epy_block_0, 0), (self.blocks_null_sink_0, 0))
self.connect((self.freq_xlating_fir_filter_xxx_0, 0), (self.analog_quadrature_demod_cf_0, 0))
self.connect((self.freq_xlating_fir_filter_xxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
if recordingToFile:
self.connect((self.soapy_rtlsdr_source_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.soapy_rtlsdr_source_0, 0), (self.freq_xlating_fir_filter_xxx_1, 0))
self.connect((self.freq_xlating_fir_filter_xxx_1, 0), (self.freq_xlating_fir_filter_xxx_0, 0))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.set_num_pts(int(self.samp_rate * 0.2))
self.analog_quadrature_demod_cf_0.set_gain((3*(self.samp_rate/(2*math.pi*self.fsk_deviation_hz))))
self.freq_xlating_fir_filter_xxx_0.set_taps(firdes.low_pass(1.0,self.samp_rate, 37500, 5000))
self.soapy_rtlsdr_source_0.set_sample_rate(0, self.samp_rate)
def get_update_rate(self):
return self.update_rate
def set_update_rate(self, update_rate):
self.update_rate = update_rate
def get_num_pts(self):
return self.num_pts
def set_num_pts(self, num_pts):
self.num_pts = num_pts
def get_fsk_deviation_hz(self):
return self.fsk_deviation_hz
def set_fsk_deviation_hz(self, fsk_deviation_hz):
self.fsk_deviation_hz = fsk_deviation_hz
self.analog_quadrature_demod_cf_0.set_gain((3*(self.samp_rate/(2*math.pi*self.fsk_deviation_hz))))
def get_fname(self):
return self.fname
def set_fname(self, fname):
self.fname = fname
def get_file_name(self):
return self.file_name
def set_file_name(self, file_name):
self.file_name = file_name
self.blocks_file_sink_0.open(self.file_name)
def main(top_block_cls=what, options=None):
global recordingToFile
recordingToFile = sys.argv[min(len(sys.argv)-1, 1)] == "--record"
if recordingToFile:
print("-" * 100)
print("| " + " " * 42 + "RECORDING IQ" + " "*42 + " |")
print("-"*100)
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.wait()
if __name__ == '__main__':
main()