programing

효율적인 순환 버퍼?

powerit 2023. 7. 22. 10:29
반응형

효율적인 순환 버퍼?

저는 (버퍼의 정수 값의 평균을 구하는 것을 목표로) 파이썬에서 효율적인 순환 버퍼를 만들고 싶습니다.

이것이 목록을 사용하여 값을 수집하는 효율적인 방법입니까?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

무엇이 더 효율적입니까(그리고 그 이유는 무엇이 더 효율적입니까?

다음과 같이 사용할 수 있습니다.maxlen

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

문서에는 다음을 위한 요리법이 있습니다.deque그것은 당신이 원하는 것과 비슷합니다.이것이 가장 효율적이라는 제 주장은 C에서 최고의 코드를 만드는 습관이 있는 믿을 수 없을 정도로 숙련된 팀에 의해 구현된다는 사실에 전적으로 의존합니다.

여기에는 이미 많은 훌륭한 답변이 있지만, 언급된 옵션의 타이밍을 직접 비교할 수 있는 방법을 찾을 수 없었습니다.그러므로 아래의 비교에서 저의 겸손한 시도를 찾아주시기 바랍니다.

할 수 .list, 기반버퍼, acollections.deque- buffer, " "는 " 입니다.Numpy.roll기반 버퍼.

로 고는 다음과 .update메소드는 한 번에 하나의 값만 추가하여 단순성을 유지합니다.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

내 시스템에서 다음과 같은 결과가 나옵니다.

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

목록의 선두에서 팝업하면 전체 목록이 복사되므로 비효율적입니다.

대신 고정 크기의 목록/어레이와 항목을 추가/제거할 때 버퍼를 통해 이동하는 인덱스를 사용해야 합니다.

MoonCactus의 답변을 바탕으로, 여기에 다음과 같은 것이 있습니다.circularlist과 다른 업수입니다. 그의 버전과 다른 점은 여기서c[0] 요소인,▁the것▁always다▁will▁give니입▁element제공을 제공합니다.c[-1] 최근에 된 요소인,,c[-2]끝자리가...응용 프로그램의 경우 이 방법이 더 자연스럽습니다.

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

클래스:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)

deque 클래스를 사용해도 좋습니다. 하지만 질문(평균)의 요구 사항에 대해 이것이 제 해결책입니다.

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14

파이썬의 데크는 느립니다.대신 numpy.roll을 사용할 수도 있습니다. numpy 배열의 모양(n,) 또는 (n,1)에서 숫자를 회전하려면 어떻게 해야 합니까?

이 벤치마크에서 deque는 448ms입니다.Numpy.roll은 29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/ 입니다.

Python Cookbook의 솔루션은 링 버퍼 인스턴스가 가득 찼을 때 재분류를 포함하여 어떻습니까?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

구현에서 주목할 만한 설계 선택은 이러한 객체가 수명의 특정 시점(비완전 버퍼에서 완전 버퍼로)에 비가역적 상태 전환을 겪기 때문입니다(그리고 그 시점에서의 동작 변화). -바꿔서.self.__class__은 두 동일한 슬롯을 한 2들어, 와 Python 2 (RingBuffer 2.2같서합니다니합작잘동를도, RingBuffer 서에클스고래개인두적).__Full이 요리법에서).

인스턴스의 클래스를 변경하는 것은 많은 언어에서 이상할 수 있지만, 이 레시피에서처럼 행동에 크게 영향을 미치는 때때로, 거대하고, 되돌릴 수 없으며, 별개의 상태 변화를 나타내는 다른 방법에 대한 피톤식 대안입니다.파이썬이 모든 종류의 수업에 지원하는 것은 좋은 일입니다.

크레디트: 세바스티앙 킴

이 꽤 오래된 파이썬 레시피도 볼 수 있습니다.

다음은 NumPy 어레이를 사용하는 나만의 버전입니다.

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value

저는 직렬 프로그래밍을 하기 전에 이런 문제가 있었습니다.1년여 전에는 효율적인 구현을 찾을 수 없었습니다. 그래서 는 C 확장자로 하나를 작성하게 되었고 MIT 라이선스로 Pypi에서도 사용할 수 있게 되었습니다.8비트 서명된 문자의 버퍼만 처리할 수 있는 슈퍼 베이직이지만 길이가 유연하기 때문에 문자가 아닌 다른 것이 필요할 경우 Struct 등을 사용할 수 있습니다.구글 검색을 통해 요즘 몇 가지 옵션이 있다는 것을 알 수 있습니다. 그래서 당신도 그것들을 보는 것이 좋을 것입니다.

Github에서:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py

이것은 어떤 라이브러리도 필요하지 않습니다.목록을 만든 다음 인덱스별로 순환합니다.

설치 공간이 매우 작으며(라이브러리 없음) dequeue보다 두 배 이상 빠르게 실행됩니다.이는 실제로 이동 평균을 계산하는 데 유용하지만 위와 같이 항목이 연령별로 정렬되지는 않습니다.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

평균 값을 구하는 방법(예:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

결과:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

이것은 데큐와 동등한 시간의 약 1/3입니다.

여기에는 많은 답변이 있지만 U에 대한 D LeftAdjoint에서 제안한 Numpind 배열을 하위 클래스로 분류하지 않습니다. 이는 효율적으로 확장되지 않는 np.roll을 사용하는 것을 방지하고 배열 슬라이싱과 같은 Numpy 배열의 모든 이점을 전달합니다.Numpy 배열을 사용하면 평균을 포함하여 실행해야 하는 대부분의 분석을 수행할 수 있습니다.

RingArray 클래스

내 솔루션은 Numpy 설명서에 명시된 지침을 사용하여 np.ndarray를 하위 분류합니다.

RingArray는 지정된 모양으로 초기화되고 np.nan 값으로 채워집니다.

이터 도구 주기는 배열에서 편집할 다음 행 위치를 제공하는 1차원 주기를 만드는 데 사용됩니다.이 값은 초기화 중인 어레이의 높이를 기준으로 합니다.

ndarray 방법에 추가 방법이 추가되어 사이클의 다음 위치에 데이터를 씁니다.

class RingArray(np.ndarray):
    """A modified numpy array type that functions like a stack. 
    RingArray has a set size specified during initialisation. 
    Add new data using the append() method, which will replace the 
    next value in a cyclical fashion. The array itself has all the 
    properties of a numpy array e.g. it can be sliced and accessed as 
    normal. Initially fills the array with np.nan values.
    
    Options
    --------
    shape : tuple
        A tuple of (height, width) for the maximum size of the array.

    Attributes
    ----------
    Inherited from nd.array. Initially fills array with np.nan values.
    
    Methods
    --------
    append(data)
        Add/replace data in the next element of the cycle.
        Data should be the length of the RingArray width.
    
    """    
    def __new__(subtype, shape):
        obj = super().__new__(subtype, shape)
        
        obj = np.vectorize(lambda x: np.nan)(obj)
        
        obj._pointer = cycle(np.arange(0, shape[0]))
        
        return obj
    
    # needed by numpy
    def __array_finalize__(self, obj):
         if obj is None: return
        
    # add data to the next element (looped)
    def append(self, data):
        """Adds or replaces data in the RingArray.
        The function writes to the next row in the Array.
        Once the last row is reached, the assignment row 
        loops back to the start.

        Parameters
        ----------
        data : array_like
            Data should be the length of the RingArray width.
        """        
        self[next(self._pointer)] = data

성능

저는 이 방법이 O(1)에서 확장된다고 믿지만, 저는 컴퓨터 과학자가 아니므로 제가 틀렸다면 수정해주세요!

발생 가능한 문제

ndarray의 하위 클래스이므로 해당 클래스의 모든 메서드를 RingArray에서 사용할 수 있습니다.np.delete와 같은 배열 함수를 사용하여 값을 제거하거나 추가하면 배열의 모양이 변경됩니다.이로 인해 초기화 시 설정된 사이클에 오류가 발생합니다.따라서 append() 이외의 다른 방법으로 배열을 편집할 때는 주의해야 합니다.

처음 스택 오버플로 게시물인데, 개선할 점이 있으면 알려주세요 :).

원래 질문은 "효율적인" 순환 버퍼였습니다.요구된 효율성에 따르면, 아론스터링의 대답은 확실히 맞는 것 같습니다.Python으로 프로그래밍된 전용 클래스를 사용하고 collections.deque와 시간 처리를 비교하면 deque와 함께 x5.2배의 가속을 보여줍니다!이를 테스트하기 위한 매우 간단한 코드는 다음과 같습니다.

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

데케를 리스트로 변환하려면 다음을 사용합니다.

my_list = [v for v in my_deque]

그러면 데큐 항목에 O(1) 랜덤 액세스 권한을 얻을 수 있습니다.물론, 이것은 한 번 설정한 후에 데큐에 대한 많은 랜덤 액세스를 수행해야 하는 경우에만 유용합니다.

이것은 가장 최근의 텍스트 메시지를 저장하기 위한 일부 버퍼에 동일한 주체를 적용하는 것입니다.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  

저는 여기서 답을 이해하지요.분명히 NumPy 내에서 작업하는 경우 어레이 또는 nandarray(일반적으로)를 하위 클래스로 분류할 수 있습니다. 그런 방식으로(적어도 순환 어레이가 가득 차면) 순환 어레이에서 NumPy 어레이 산술 연산을 계속 사용할 수 있습니다.주의해야 할 점은 여러 구성 요소(예: 이동 평균)에 걸쳐 있는 작업의 경우 버퍼에 누적된 창보다 창이 크지 않다는 것입니다.

또한, 모든 논평자들이 언급했듯이, 롤링은 효율성의 목적에 어긋나기 때문에 사용하지 마세요.확장 어레이가 필요한 경우 크기 조정이 필요할 때마다 크기를 두 배로 늘리기만 하면 됩니다(이것은 주기적인 어레이 구현과는 다릅니다).

언급URL : https://stackoverflow.com/questions/4151320/efficient-circular-buffer

반응형