ibmi-brunch-learn

Announcement

Collapse
No announcement yet.

Problem when exchanging between threads via usrq

Collapse
X
 
  • Filter
  • Time
  • Show
Clear All
new posts

  • Problem when exchanging between threads via usrq

    Has anyone here worked with *USRQ? There is a problem when exchanging between threads via usrq
    There are two threads that are quite intensively exchanging messages via KEYED USRQ (one sends messages with the key KEY1 and reads with the key KEY2, the second vice versa).
    The problem is that if all this works in two different jobs, then everything is fine. But when in two different threads of the same task, problems with "sticking" begin - a message that is definitely sent from one thread becomes available for reading from another after a long time (maximum was fixed up to 120 seconds).
    Judging by the stack, both threads are currently in the message waiting state (DEQWAIT) - one thread has sent a message and is waiting for a response message from the second, but the second cannot receive the sent message for a long time.
    I repeat - the problem is only when exchanging messages between two threads within the same job. If the exchange is between two different jobs, everything works very quickly.​

  • #2
    Hi Victor,

    are you sure you are not using any variable declared Static(*ALLTHREAD)?

    Could you post some post example?

    Bye

    Comment


    • #3
      Originally posted by paolinosal View Post
      Hi Victor,

      are you sure you are not using any variable declared Static(*ALLTHREAD)?

      Could you post some post example?

      Bye
      Thanks for the answer.

      I will say right away - this is not my code. I wrote USRQ API (USRQ_xxxx)
      And I have no experience with threads on AS/400.
      I did tests to work in various tasks - everything works fine there.
      Here I am confused by working with the file - there may be problems in this. I'll try to rewrite the code so that only work with the queue remains and see if it helps.â€Ã�? ?‹

      In my tests, time tracking is done a little differently.

      Code:
      **FREE
      ctl-opt main(mainThread) thread(*concurrent) bnddir('QC2LE' :'LESBNDDIR');
      
      /COPY USRQSRC100,USRQRPGP
      /COPY QSYSINC/QRPGLESRC,PTHREAD
      
      dcl-ds DSTS qualified template;
        TS  timestamp;
      end-ds;
      dcl-ds getThread likeds(pthread_t);
      
      dcl-pr qcmdexc extpgm;
        *n char(100) const;
        *n packed(15:5) const;
      end-pr;
      
      dcl-proc mainThread;
        dcl-ds uqMsg likeds(DSTS);
        dcl-s  rc int(10);
        dcl-s  Error char(1024);
        dcl-s  TS2 timestamp;
        dcl-s  Dur int(20);
        dcl-s  hQueue int(10);
      
        dcl-f UQRESP disk(*ext) usage(*update :*output);
        dcl-ds DSUQRESP extname('UQRESP':*output) qualified end-ds;
      
        callp(e) qcmdexc('DLTOBJ AMXSRC/UQTEST *USRQ' :26);
        USRQ_Create('UQTEST' :'AMXSRC' :queKeyd :'Signal USRQ' :26 :4 :128: 128: queSzDFT: Error);
        hQueue = USRQ_Connect('UQTEST' :'AMXSRC' :1 :1 :1000 :*off :Error);
      
        rc = pthread_create(getThread : *OMIT : %paddr(SecondThread) :*null);
      
        DoW *on;
          uqMsg.TS = %timestamp();
          DoU rc > 0;
            rc = USRQ_SendKey(hQueue: uqMsg: 26: 'MAIN': 4: Error);
          EndDo;
      
          DoU rc > 0;
            rc = USRQ_RecvMsg(hQueue :uqMsg :26 :'SCND' :4 :queKeySeqEQ :*omit :*omit :*omit);
          EndDo;
          TS2 = %timestamp();
          DSUQRESP.STRTS = uqMsg.TS;
          DSUQRESP.Dur = %diff(TS2 :uqMsg.TS :*ms);
          write uqrespr DSUQRESP;
        EndDo;
      
        USRQ_Disconnect(hQueue :Error);
        return;
      end-proc;
      
      dcl-proc SecondThread;
        dcl-ds uqMsg likeds(DSTS);
        dcl-s  rc int(10);
        dcl-s Error char(1024);
        dcl-s TS2 timestamp;
        dcl-s Dur int(20);
        dcl-s hQueue int(10);
      
        dcl-f UQRESP disk(*ext) usage(*update :*output);
        dcl-ds DSUQRESP extname('UQRESP':*output) qualified end-ds;
      
        hQueue = USRQ_Connect('UQTEST' :'AMXSRC' :1 :1 :1000 :*off :Error);
      
        DoW *on;
          DoU rc> 0;
            rc = USRQ_RecvMsg(hQueue :uqMsg :26 :'MAIN' :4 :queKeySeqEQ :*omit :*omit :*omit);
          EndDo;
          TS2 = %timestamp();
          DSUQRESP.STRTS = uqMsg.TS;
          DSUQRESP.Dur = %diff(TS2 :uqMsg.TS :*ms);
          write uqrespr DSUQRESP;
      
          uqMsg.TS = %timestamp();
          DoU rc> 0;
            rc = USRQ_SendKey(hQueue: uqMsg: 26: 'SCND': 4: Error);
          EndDo;
        EndDo;
        return;
      End-proc;​

      Comment


      • #4
        Have you tried using two queues? Queue Home sends messages from thread A to B, queue # 2 sends messages from thread B to A.

        Comment


        • #5
          Originally posted by UserName10 View Post
          Have you tried using two queues? Queue Home sends messages from thread A to B, queue # 2 sends messages from thread B to A.
          Yes, I'm thinking of suggesting trying this option. Or try using 2 UNIX sockets

          Comment


          • #6
            Do you really need to use threads instead of a job?

            BTW - "queue home" was intended to be "queue # 1".

            Comment


            • #7
              Originally posted by UserName10 View Post
              Do you really need to use threads instead of a job?

              BTW - "queue home" was intended to be "queue # 1".
              I dont know. I have developed an API to use USRQ. In case of care between multiple jobs, it works great. And this is the main use for us.
              But the question arose - how it will work between several threads in one job. And then there were problems.
              I would like to understand - these are implementation problems, or at the system level. To give guidelines for use and indicate boundary conditions.
              In this particular case, it is possible to organize data exchange through shared memory with the organization of an analogue of a critical section in Windows.​

              Comment


              • #8
                He spent his pure, test. Where there is no work with the table, only with a queue. No problems were found there.

                This is the main thread (with abbreviations, only the main part):

                Code:
                      dcl-c    BUFF_SIZE    const(64);
                      dcl-c    KEY_SIZE     const(6);
                      dcl-c    SNDKEYS      const('SERVER');
                      dcl-c    RCVKEYS      const('CLIENT');
                      dcl-c    SNDKEYC      const('CLIENT');
                      dcl-c    RCVKEYC      const('SERVER');
                      dcl-c    CMDRDY       const('READY');
                      dcl-c    CMDDTA       const('DATA ');
                      dcl-c    CMDEND       const('END  ');
                      dcl-c    TESTCNT      const(500000);
                
                      dcl-ds   t_dsPacket qualified template;
                        cmd               char(5)        inz(*blanks);
                        fromServer        packed(16: 6)  inz(*zero);
                        fromClient        packed(16: 6)  inz(*zero);
                      end-ds;
                
                        queCreated = USRQ_Create('UQRPGT': 'Y2KU': queKeyd: 'USRQ RPG Test': BUFF_SIZE: KEY_SIZE: 1024: 128: queSzMAX: Error);
                        if not queCreated;
                          exsr ExitSr;
                        endif;
                
                        hQueue = USRQ_Connect('UQRPGT': 'Y2KU': 5: 10: 10: *off: Error);        
                        if hQueue < 0;
                          exsr ExitSr;
                        endif;
                
                        nCount = pthread_create(getThread : *omit : %paddr(USRQTCLTH) :*null);
                        if nCount <> 0;
                          exsr ExitSr;
                        endif;
                
                        wait = GetTime();
                
                        dou (GetTime() - wait) > 5;
                          nCount = USRQ_RecvMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                        RCVKEYS: KEY_SIZE: queKeySeqEQ:
                                                        *omit: *omit: Error);
                          select;
                            when nCount < 0;
                              exsr ExitSr;
                            when nCount = %size(t_dsPacket) and
                                 dsPacket.cmd = CMDRDY;
                              cltReady = *on;
                              leave;
                          endsl;
                        enddo;
                
                        if cltReady;
                          wait = GetTime();
                
                          dow nSndPackets < TESTCNT and nRcvPackets < TESTCNT and (GetTime() - wait) < 180;
                            if nSndPackets < TESTCNT;
                              clear dsPacket;
                              dsPacket.cmd        = CMDDTA;
                              dsPacket.fromServer = GetTime();
                              nCount = USRQ_SendMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                            SNDKEYS: KEY_SIZE: Error);
                              if nCount < 0;
                                exsr ExitSr;
                              endif;
                              nSndPackets += 1;
                            endif;
                
                            if nRcvPackets < TESTCNT;
                              nCount = USRQ_RecvMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                            RCVKEYS: KEY_SIZE: queKeySeqEQ:
                                                            *omit: *omit: Error);
                              select;
                                when nCount < 0;
                                  exsr ExitSr;
                                when nCount = %size(t_dsPacket) and
                                     dsPacket.cmd = CMDDTA;
                                  nRcvPackets += 1;
                                  dsPacket.fromClient  = (GetTime() - dsPacket.fromClient) * 1000000;
                                  Packets(nRcvPackets) = dsPacket;
                                  avgSnd              += dsPacket.fromServer;
                                  avgRcv              += dsPacket.fromClient;
                              endsl;
                            endif;
                          enddo;
                
                          clear dsPacket;
                          dsPacket.cmd  = CMDEND;
                          nCount = USRQ_SendMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                        SNDKEYS: KEY_SIZE: Error);
                
                          exsr ExitSr;
                
                        begsr ExitSr;
                          if hQueue > -1;
                            USRQ_Disconnect(hQueue: Error);
                          endif;
                
                          if queCreated;
                            USRQ_Delete('UQRPGT': 'Y2KU': Error);
                          endif;
                          return;
                        endsr;
                ​
                The second thread:

                Code:
                        hQueue = USRQ_Connect('UQRPGT': 'Y2KU': 5: 10: 10: *off: Error);        
                        if hQueue < 0;
                          exsr ExitSr;
                        endif;
                
                        clear dsPacket;
                        dsPacket.cmd = CMDRDY;
                        nCount = USRQ_SendMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                      SNDKEYC: KEY_SIZE: Error);
                        if nCount < 0;
                          exsr ExitSr;
                        endif;
                
                        wait = GetTime();
                        dow not done;
                          nCount = USRQ_RecvMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                        RCVKEYC: KEY_SIZE: queKeySeqEQ:
                                                        *omit: *omit: Error);
                          select;
                            when nCount < 0;
                              exsr ExitSr;
                            when nCount = %size(t_dsPacket);
                              wait = GetTime();
                              select;
                                when dsPacket.cmd = CMDDTA;
                                  dsPacket.fromServer = (GetTime() - dsPacket.fromServer) * 1000000;
                                  dsPacket.fromClient = GetTime();
                                  nCount = USRQ_SendMsg(hQueue: dsPacket: %size(t_dsPacket):
                                                                SNDKEYC: KEY_SIZE: Error);
                                  if nCount < 0;
                                    exsr ExitSr;
                                  endif;
                                when dsPacket.cmd = CMDEND;
                                  done = *on;
                              endsl;
                
                            when (GetTime() - wait) > 30;
                              done = *on;
                          endsl;
                        enddo;
                The average delivery time from the main thread to the second is 10.368708 us, the maximum is 144 us
                From the second to the main - 8.809686 and 153 us, respectively​

                Comment

                Working...
                X