diff --git a/btplotting/analyzers/plot.py b/btplotting/analyzers/plot.py index d5cd713..701f5de 100644 --- a/btplotting/analyzers/plot.py +++ b/btplotting/analyzers/plot.py @@ -19,12 +19,13 @@ class LivePlotAnalyzer(bt.Analyzer): params = ( ('scheme', Blackly()), ('style', 'bar'), - ('lookback', 23), + ('lookback', 0), ('address', 'localhost'), - ('port', 80), + ('port', 1234), ('title', None), - ('interval', 0.2), + ('interval', 0.1), ('paused_at_beginning', False), + ('debug', False) ) def __init__(self, iplot=True, autostart=False, **kwargs): diff --git a/btplotting/live/client.py b/btplotting/live/client.py index 7483532..08d06cb 100644 --- a/btplotting/live/client.py +++ b/btplotting/live/client.py @@ -5,6 +5,7 @@ from bokeh.layouts import column, row, layout from bokeh.models import Select, Spacer, Tabs, Button, Slider +from bokeh.document import without_document_lock from .datahandler import LiveDataHandler from ..tabs import ConfigTab @@ -14,14 +15,13 @@ class LiveClient: - ''' LiveClient provides live plotting functionality. ''' NAV_BUTTON_WIDTH = 35 - def __init__(self, doc, app, strategy, lookback, paused_at_beginning, interval=0.5): + def __init__(self, doc, app, strategy, lookback, paused_at_beginning, interval=0.01): self._app = app self._doc = doc self._strategy = strategy @@ -67,10 +67,42 @@ def _t_thread(self): if len(self._strategy) == self._lastlen: continue self._lastlen = len(self._strategy) - self._datahandler.update() - self.refresh() + if self._doc.session_context: + self._doc.add_next_tick_callback(self.safe_update) time.sleep(self._interval) + def rate_limited(max_per_second): + min_interval = 1.0 / float(max_per_second) + def decorate(func): + last_time_called = [0.0] + @wraps(func) + def rate_limited_function(*args, **kwargs): + elapsed = time.time() - last_time_called[0] + left_to_wait = min_interval - elapsed + if left_to_wait > 0: + time.sleep(left_to_wait) + ret = func(*args, **kwargs) + last_time_called[0] = time.time() + return ret + return rate_limited_function + return decorate + + + @without_document_lock + def safe_update(self): + try: + # Instead of directly updating, schedule the update for the next tick + self._doc.add_next_tick_callback(self._perform_update) + except Exception as e: + _logger.error(f"Error during safe_update: {e}") + + def _perform_update(self): + try: + self._datahandler.update() + self.refresh() + except Exception as e: + _logger.error(f"Error during _perform_update: {e}") + def _createmodel(self): def on_select_filterdata(self, a, old, new): @@ -85,24 +117,23 @@ def on_click_nav_action(self): self._pause() else: self._resume() - update_nav_buttons(self) + self._doc.add_next_tick_callback(partial(update_nav_buttons, self)) def on_click_nav_prev(self, steps=1): self._pause() - self._set_data_by_idx(self._datahandler.get_last_idx() - steps) - update_nav_buttons(self) + self._doc.add_next_tick_callback(partial(self._set_data_by_idx, self._datahandler.get_last_idx() - steps)) + self._doc.add_next_tick_callback(partial(update_nav_buttons, self)) def on_click_nav_next(self, steps=1): self._pause() - self._set_data_by_idx(self._datahandler.get_last_idx() + steps) - update_nav_buttons(self) + self._doc.add_next_tick_callback(partial(self._set_data_by_idx, self._datahandler.get_last_idx() + steps)) + self._doc.add_next_tick_callback(partial(update_nav_buttons, self)) def refresh(self, now=False): if now: update_nav_buttons(self) else: - self._doc.add_next_tick_callback( - partial(update_nav_buttons, self)) + self._doc.add_next_tick_callback(partial(update_nav_buttons, self)) def reset_nav_buttons(self): btn_nav_prev.disabled = True @@ -112,26 +143,29 @@ def reset_nav_buttons(self): btn_nav_action.label = '❙❙' def update_nav_buttons(self): - last_idx = self._datahandler.get_last_idx() - last_avail_idx = self._app.get_last_idx(self._figid) - - if self._paused: - btn_nav_action.label = '▶' - else: - btn_nav_action.label = '❙❙' - - if last_idx < self.lookback: - btn_nav_prev.disabled = True - btn_nav_prev_big.disabled = True - else: - btn_nav_prev.disabled = False - btn_nav_prev_big.disabled = False - if last_idx >= last_avail_idx: - btn_nav_next.disabled = True - btn_nav_next_big.disabled = True - else: - btn_nav_next.disabled = False - btn_nav_next_big.disabled = False + try: + last_idx = self._datahandler.get_last_idx() + last_avail_idx = self._app.get_last_idx(self._figid) + + if self._paused: + btn_nav_action.label = '▶' + else: + btn_nav_action.label = '❙❙' + + if last_idx < self.lookback: + btn_nav_prev.disabled = True + btn_nav_prev_big.disabled = True + else: + btn_nav_prev.disabled = False + btn_nav_prev_big.disabled = False + if last_idx >= last_avail_idx: + btn_nav_next.disabled = True + btn_nav_next_big.disabled = True + else: + btn_nav_next.disabled = False + btn_nav_next_big.disabled = False + except Exception as e: + _logger.error(f"Error during update_nav_buttons: {e}") # filter selection datanames = get_datanames(self._strategy) @@ -190,7 +224,7 @@ def update_nav_buttons(self): ], sizing_mode='stretch_width') - # return model and a refrash function + # return model and a refresh function return model, partial(refresh, self) def _get_filterdata(self): @@ -202,26 +236,24 @@ def _get_filterdata(self): return res def _get_tabs(self): - # return self.model.select_one({'id': 'tabs'}) return self.model.select_one({'type': Tabs}) def _set_data_by_idx(self, idx=None): - # if a index is provided, ensure that index is within data range - if idx: - # don't allow idx to be smaller than lookback - 1 - idx = max(idx, self.lookback - 1) - # don't allow idx to be bigger than max idx - last_avail_idx = self._app.get_last_idx(self._figid) - idx = min(idx, last_avail_idx) - - clk = self._figurepage.data_clock._get_clk() - # create DataFrame based on last index with length of lookback - end = self._figurepage.data_clock.get_dt_at_idx(clk, idx) - df = self._app.get_data( - end=end, - figid=self._figid, - back=self.lookback) - self._datahandler.set_df(df) + try: + if idx: + idx = max(idx, self.lookback - 1) + last_avail_idx = self._app.get_last_idx(self._figid) + idx = min(idx, last_avail_idx) + + clk = self._figurepage.data_clock._get_clk() + end = self._figurepage.data_clock.get_dt_at_idx(clk, idx) + df = self._app.get_data( + end=end, + figid=self._figid, + back=self.lookback) + self._datahandler.set_df(df) + except Exception as e: + _logger.error(f"Error during _set_data_by_idx: {e}") def _pause(self): self._paused = True @@ -247,31 +279,43 @@ def is_paused(self): return self._paused def refresh(self): - if self._refresh_fnc: - self._refresh_fnc(False) + try: + if self._refresh_fnc: + self._refresh_fnc(False) + except Exception as e: + _logger.error(f"Error during refresh: {e}") def refreshmodel(self): - if self._datahandler is not None: - self._datahandler.stop() - self._app.update_figurepage(filterdata=self._get_filterdata()) - self._datahandler = LiveDataHandler(self) - tab_panels = self._app.generate_bokeh_model_tab_panels() - for t in self._app.tabs: - tab = t(self._app, self._figurepage, self) - if tab.is_useable(): - tab_panels.append(tab.get_tab_panel()) - self._get_tabs().tabs = list(filter(None.__ne__, tab_panels)) - self.refresh() + try: + if self._datahandler is not None: + self._datahandler.stop() + self._app.update_figurepage(filterdata=self._get_filterdata()) + self._datahandler = LiveDataHandler(self) + tab_panels = self._app.generate_bokeh_model_tab_panels() + for t in self._app.tabs: + tab = t(self._app, self._figurepage, self) + if tab.is_useable(): + tab_panels.append(tab.get_tab_panel()) + self._get_tabs().tabs = list(filter(None.__ne__, tab_panels)) + self.refresh() + except Exception as e: + _logger.error(f"Error during refreshmodel: {e}") def next(self): - if self._interval != 0: - return - if len(self._strategy) == self._lastlen: - return - self._lastlen = len(self._strategy) - self._datahandler.update() + try: + if self._interval != 0: + return + if len(self._strategy) == self._lastlen: + return + self._lastlen = len(self._strategy) + self._datahandler.update() + except Exception as e: + _logger.error(f"Error during next: {e}") def stop(self): - self._running = False - self._thread.join() - self._datahandler.stop() + try: + self._running = False + self._thread.join() + self._datahandler.stop() + except Exception as e: + _logger.error(f"Error during stop: {e}") diff --git a/btplotting/live/datahandler.py b/btplotting/live/datahandler.py index 80691d1..dea20cc 100644 --- a/btplotting/live/datahandler.py +++ b/btplotting/live/datahandler.py @@ -25,67 +25,75 @@ def __init__(self, client): @gen.coroutine def _cb_push(self): - ''' - Pushes to all ColumnDataSources - ''' fp = self._client.get_figurepage() - # get all rows to patch - patches = {} - for idx in list(self._patches.keys()): - try: - patch = self._patches.pop(idx) - patches[idx] = patch - except KeyError: - continue + _logger.debug(f"Starting _cb_push. Current datastore shape: {self._datastore.shape}") + _logger.debug(f"Current datastore index: {self._datastore.index}") + + def log_cds_state(cds, message): + lengths = {col: len(data) for col, data in cds.data.items()} + _logger.debug(f"{message} - CDS lengths: {lengths}") + if len(set(lengths.values())) > 1: + _logger.warning(f"{message} - CDS columns have different lengths!") + + log_cds_state(fp.cds, "Initial state") + + patches = {idx: self._patches.pop(idx) for idx in list(self._patches.keys())} + _logger.debug(f"Patches to apply: {len(patches)}") - # patch figurepage for idx, patch in patches.items(): + _logger.debug(f"Processing patch for index {idx}") p_data, s_data = fp.get_cds_patchdata_from_series(idx, patch) - if len(p_data) > 0: - _logger.debug(f'Sending patch for figurepage: {p_data}') + + if p_data: + _logger.debug(f"Patch data for index {idx}: {p_data}") fp.cds.patch(p_data) - if len(s_data) > 0: - _logger.debug(f'Sending stream for figurepage: {s_data}') + log_cds_state(fp.cds, f"After patching index {idx}") + + if s_data: + _logger.debug(f"Stream data for index {idx}: {s_data}") fp.cds.stream(s_data, self._get_data_stream_length()) - # patch all figures + log_cds_state(fp.cds, f"After streaming index {idx}") + for f in fp.figures: - # only fill with nan if not filling gaps fillnan = f.fillnan() - # get patch data - p_data, s_data = f.get_cds_patchdata_from_series( - idx, patch, fillnan) - if len(p_data) > 0: - _logger.debug(f'Sending patch for figure: {p_data}') + p_data, s_data = f.get_cds_patchdata_from_series(idx, patch, fillnan) + + if p_data: + _logger.debug(f"Figure patch data for index {idx}: {p_data}") f.cds.patch(p_data) - if len(s_data) > 0: - _logger.debug(f'Sending stream for figure: {s_data}') + log_cds_state(f.cds, f"After patching figure for index {idx}") + + if s_data: + _logger.debug(f"Figure stream data for index {idx}: {s_data}") f.cds.stream(s_data, self._get_data_stream_length()) - self._lastidx = s_data['index'][-1] + log_cds_state(f.cds, f"After streaming figure for index {idx}") + self._lastidx = s_data['index'][-1] if 'index' in s_data else self._lastidx - ''' - # take all rows from datastore that were not yet streamed - update_df = self._datastore[self._datastore.index >= self._lastidx] - if not update_df.shape[0]: - return - - # store last index of streamed data - self._lastidx = update_df.index[-1] - - # create stream data for figurepage - data = fp.get_cds_streamdata_from_df(update_df) - if data: - _logger.debug(f'Sending stream for figurepage: {data}') - fp.cds.stream(data, self._get_data_stream_length()) - - # create stream df for every figure - for f in fp.figures: - data = f.get_cds_streamdata_from_df(update_df) + update_df = self._datastore[self._datastore.index > self._lastidx] + _logger.debug(f"Rows to update: {update_df.shape[0]}") + + if not update_df.empty: + self._lastidx = update_df.index[-1] + data = fp.get_cds_streamdata_from_df(update_df) if data: - _logger.debug(f'Sending stream for figure: {data}') - f.cds.stream(data, self._get_data_stream_length()) - self._lastidx = self._datastore.index[-1] - ''' + _logger.debug(f"Streaming data from update_df: {data}") + fp.cds.stream(data, self._get_data_stream_length()) + log_cds_state(fp.cds, "After streaming update_df") + + for f in fp.figures: + data = f.get_cds_streamdata_from_df(update_df) + if data: + _logger.debug(f"Streaming figure data from update_df: {data}") + f.cds.stream(data, self._get_data_stream_length()) + log_cds_state(f.cds, "After streaming figure update_df") + + _logger.debug(f"Final datastore shape: {self._datastore.shape}") + _logger.debug(f"Final datastore index: {self._datastore.index}") + log_cds_state(fp.cds, "Final state") + + _logger.debug("Finished _cb_push") + def _fill(self): ''' @@ -102,21 +110,24 @@ def _fill(self): fp.set_cds_columns_from_df(self._datastore) def _set_data(self, data, idx=None): - ''' - Replaces or appends data to datastore - ''' + _logger.debug(f"Setting data. Type: {type(data)}, idx: {idx}") if isinstance(data, pd.DataFrame): self._datastore = data self._lastidx = -1 elif isinstance(data, pd.Series): if idx is None: - self._datastore = self._datastore.append(data) + self._datastore = pd.concat([self._datastore, pd.DataFrame(data).T]) else: self._datastore.loc[idx] = data else: raise Exception('Unsupported data provided') - self._datastore = self._datastore.tail( - self._get_data_stream_length()) + + self._datastore = self._datastore.tail(self._get_data_stream_length()) + self._datastore = self._datastore.reset_index(drop=True) + _logger.debug(f"Datastore after setting data: {self._datastore.shape}") + _logger.debug(f"Datastore index after setting data: {self._datastore.index}") + + def _push(self): doc = self._client.get_doc() @@ -127,21 +138,28 @@ def _push(self): self._cb = doc.add_next_tick_callback(self._cb_push) def _process_data(self, data): - ''' - Request to update data with given data - ''' + _logger.debug(f"Starting _process_data. Data shape: {data.shape}") + _logger.debug(f"Data index: {data.index}") + for idx, row in data.iterrows(): - if (idx in self._datastore.index): + _logger.debug(f"Processing row with index {idx}") + if idx in self._datastore.index: + _logger.debug(f"Updating existing row at index {idx}") self._set_data(row, idx) self._patches[idx] = row else: + _logger.debug(f"Adding new row at index {idx}") self._set_data(row) - # if self._datastore is not None: - # self._datastore.drop_duplicates("datetime", keep='last', inplace=True) + if self._datastore is not None: + _logger.debug(f"Datastore before cleanup: {self._datastore.shape}") + self._datastore = self._datastore.drop_duplicates("datetime", keep='last') + self._datastore = self._datastore.reset_index(drop=True) + _logger.debug(f"Datastore after cleanup: {self._datastore.shape}") + _logger.debug(f"Datastore index after cleanup: {self._datastore.index}") self._push() - + def _get_data_stream_length(self): ''' Returns the length of data stream to use diff --git a/btplotting/optbrowser.py b/btplotting/optbrowser.py old mode 100755 new mode 100644