Skip to content

Commit

Permalink
feat: timetags for chosen channel
Browse files Browse the repository at this point in the history
added a function to get timetags for a chosen channel from the buffer
  • Loading branch information
Peter-Barrow committed Sep 12, 2024
1 parent 4f1aafd commit 591958a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "tangy"
version = "0.9.0"
version = "0.9.1"
description = "Timetag analysing library"
authors = [
{name = "Peter Thomas Barrow", email = "[email protected]"}
Expand Down
4 changes: 2 additions & 2 deletions tangy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
if _uqd_lib is not None:
from ._uqd import UQDLogic16

if _qutag_lib is not None:
from ._qutag import QuTagHR, DeviceChannel, SignalEdge, SignalConditioning
# if _qutag_lib is not None:
# from ._qutag import QuTagHR, DeviceChannel, SignalEdge, SignalConditioning
85 changes: 81 additions & 4 deletions tangy_src/_tangy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from cython.cimports.libc.stdint import int64_t as i64
import struct
from typing import List, Tuple, Optional, Union
from numpy.typing import NDArray
from enum import Enum
from platformdirs import user_config_dir
from warnings import warn
Expand Down Expand Up @@ -870,12 +871,84 @@ def lower_bound(self, time: float) -> int:
index: u64n = _tangy.tangy_lower_bound(self._ptr_buf, bins)
return index

@cython.ccall
def timetags(self,
channels: Optional[Union[int, List[int]]] = None,
start: Optional[int] = None,
stop: Optional[int] = None) -> List[Tuple[int, NDArray]]:
""" Get the timetags for selected channels
Args:
channels (Optional[Union[int, List[int]]]): Channels to get
timetags for, can be either a single channel (int) or many
(list of ints). If no channels are set and value left as None,
all channels will gathered.
start (Optional[int]): Position in buffer to start collecting from,
if None the first valid position in the buffer will be used.
stop (Optional[int]): Position in the buffer to collect timetags
up to, if left as None the end of the buffer will be used.
Returns:
(List[Tuple[int, NDArray]]): a list of tuples, each list element
contains the channel index and ndarray of timetags for that
channel
Examples:
Get all timetags for channels 0 and 1
>>> timetags = buffer.timetags(channels=[0, 1])
Get the first 10 timetags on channel 2
>>> start = buffer.begin
>>> stop = start + 10
>>> channel_target = 2
>>> timetags_channel_2 = timetags(channels=channel_target,
start=start,
stop=stop)
Get the last 100 timetags on channel 4
>>> stop = buffer.end
>>> stop = stop - 100
>>> timetags_channel_2 = timetags(channels=4,
start=start,
stop=stop)
"""

if start is None:
start = self.begin

if stop is None:
stop = self.end

if channels is None:
channels = [c for c in range(self.channel_count)]

if type(channels) is int:
channels = [channels]

tag_arrays = []
if self._buf.format == _tangy.buffer_format.STANDARD:
(ch, tt) = self[start:stop]
for c in channels:
channel_mask = ch == c
if channel_mask.sum() != 0:
tag_arrays.append((c, tt[channel_mask]))

if self._buf.format == _tangy.buffer_format.CLOCKED:
(ch, (cl, dt)) = self[start:stop]
for c in channels:
channel_mask = ch == c
if channel_mask != 0:
tag_arrays.append(
(c, (cl[channel_mask], dt[channel_mask])))

return tag_arrays

@cython.ccall
def singles(self, read_time: Optional[float] = None,
start: Optional[int] = None,
stop: Optional[int] = None) -> Tuple[int, List[int]]:
"""Count the occurances of each channel over a region of the buffer
Args:
buffer (RecordBuffer): Buffer containing timetags
read_time (Optional[float] = None): Length of time to integrate over
Expand Down Expand Up @@ -1132,11 +1205,15 @@ def relative_delay(self, channel_a: int, channel_b: int, read_time: float,
# PERF: replace time trace with singles()
# NOTE: decay (tau) is ≈ 1 / singles(channelX), so we have:
# tau_a = 1 / singles(channel_a) and tau_b = 1 / singles(channel_b)
resolution_trace: f64n = 1.0
trace: u64n[:] = self.timetrace(
[channel_a, channel_b], read_time, resolution_trace)
# resolution_trace: f64n = 1.0
# trace: u64n[:] = self.timetrace(
# [channel_a, channel_b], read_time, resolution_trace)

(_, counts_singles) = self.singles(read_time)
intensity_average = (
counts_singles[channel_a] + counts_singles[channel_b]) / read_time

intensity_average = mean(trace) / resolution_trace
# intensity_average = mean(trace) / resolution_trace

correlation_window: f64n = window
if window is None:
Expand Down

0 comments on commit 591958a

Please sign in to comment.